Compare commits

..

54 Commits

Author SHA1 Message Date
Arseny Sher
811eb6d9b5 refactor args 2025-04-02 12:07:12 +02:00
Arseny Sher
a2a4517b86 more combos 2025-04-02 12:07:12 +02:00
Arseny Sher
0ad1f445e2 ff 2025-04-02 12:07:12 +02:00
Arseny Sher
146a7129de basic joint tests 2025-04-02 12:07:12 +02:00
Arseny Sher
651efef5d3 more tests 2025-04-02 12:07:12 +02:00
Arseny Sher
7e2c678ac4 indent 2025-04-02 12:07:12 +02:00
Arseny Sher
b365ab90bb GetAcknowledgedByQuorumWALPosition 2025-04-02 12:07:12 +02:00
Arseny Sher
a9845e820a votescollected 2025-04-02 12:07:12 +02:00
Arseny Sher
a2cbf64797 greetings 2025-04-02 12:07:12 +02:00
Arseny Sher
5426272f14 mappings 2025-04-02 12:07:12 +02:00
Arseny Sher
edfa610920 basic preps 2025-04-02 12:07:12 +02:00
Arseny Sher
8baa32b90c test framework 2025-04-02 12:07:12 +02:00
Alex Chi Z.
c4fc602115 feat(pageserver): support synthetic size calculation for invisible branches (#11335)
## Problem

ref https://github.com/neondatabase/neon/issues/11279


Imagine we have a branch with 3 snapshots A, B, and C:
```
base---+---+---+---main
        \-A \-B \-C
base=100G, base-A=1G, A-B=1G, B-C=1G, C-main=1G
```
at this point, the synthetic size should be 100+1+1+1+1=104G.

after the deletion, the structure looks like:
```
base---+---+---+
       \-A \-B \-C
```
If we simply assume main never exists, the size will be calculated as
size(A) + size(B) + size(C)=300GB, which obviously is not what the user
would expect.

The correct way to do this is to assume part of main still exists, that
is to say, set C-main=1G:
```
base---+---+---+main
       \-A \-B \-C
```
And we will get the correct synthetic size of 100G+1+1+1=103G.


## Summary of changes

* Do not generate gc cutoff point for invisible branches.
* Use the same LSN as the last branchpoint for branch end.
* Remove test_api_handler for mark_invisible.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-01 18:50:58 +00:00
Konstantin Knizhnik
02936b82c5 Fix effective_lsn calculation for prefetch (#11219)
## Problem

See  https://neondb.slack.com/archives/C04DGM6SMTM/p1741594233757489

Consider the following scenario:

1. Backend A wants to prefetch some block B
2. Backend A checks that block B is not present in shared buffer
3. Backend A registers new prefetch request and calls
prefetch_do_request
4. prefetch_do_request calls neon_get_request_lsns
5. neon_get_request_lsns obtains LwLSN for block B
6. Backend B downloads B, updates and wallogs it (let say to Lsn1)
7. Block B is once again thrown from shared buffers, its LwLSN is set to
Lsn1
8. Backend A obtains current flush LSN, let's say that it is Lsn1
9. Backend A stores Lsn1 as effective_lsn in prefetch slot.
10. Backend A reads page B with LwLSN=Lsn1
11. Backend A finds in prefetch ring response for prefetch request for
block B with effective_lsn=Lsn1, so that it satisfies
neon_prefetch_response_usable condition
12. Backend A uses deteriorated version of the page!

## Summary of changes

Use `not_modified_since` as `effective_lsn`. 
It should not cause some degrade of performance because we store LwLSN
when it was not found in LwLSN hash, so if page is not changed till
prefetch response is arrived, then LwLSN should not be changed.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-01 15:48:02 +00:00
Arpad Müller
1fad1abb24 storcon: timeline deletion improvements and fixes (#11334)
This PR contains a bunch of smaller followups and fixes of the original
PR #11058. most of these implement suggestions from Arseny:

* remove `Queryable, Selectable` from `TimelinePersistence`: they are
not needed.
* no `Arc` around `CancellationToken`: it itself is an arc wrapper
* only schedule deletes instead of scheduling excludes and deletes
* persist and delete deletion ops
* delete rows in timelines table upon tenant and timeline deletion
* set `deleted_at` for timelines we are deleting before we start any
reconciles: this flag will help us later to recognize half-executed
deletions, or when we crashed before we could remove the timeline row
but after we removed the last pending op (handling these situations are
left for later).

Part of #9011
2025-04-01 15:16:33 +00:00
Arpad Müller
016068b966 API spec: add safekeepers field returned by storcon (#11385)
Add an optional `safekeepers` field to `TimelineInfo` which is returned
by the storcon upon timeline creation if the
`--timelines-onto-safekeepers` flag is enabled. It contains the list of
safekeepers chosen.

Other contexts where we return `TimelineInfo` do not contain the
`safekeepers` field, sadly I couldn't make this more type safe like done
in Rust via `TimelineCreateResponseStorcon`, as there is no way of
flattening or inheritance (and I don't that duplicating the entire type
for some minor type safety improvements is worth it).

The storcon side has been done in #11058.

Part of https://github.com/neondatabase/cloud/issues/16176
cc https://github.com/neondatabase/cloud/issues/16796
2025-04-01 12:39:10 +00:00
Erik Grinaker
80596feeaa pageserver: invert CompactFlags::NoYield as YieldForL0 (#11382)
## Problem

`CompactFlags::NoYield` was a bit inconvenient, since every caller
except for the background compaction loop should generally set it (e.g.
HTTP API calls, tests, etc). It was also inconsistent with
`CompactionOutcome::YieldForL0`.

## Summary of changes

Invert `CompactFlags::NoYield` as `CompactFlags::YieldForL0`. There
should be no behavioral changes.
2025-04-01 11:43:58 +00:00
Erik Grinaker
225cabd84d pageserver: update upload queue TODOs (#11377)
Update some upload queue TODOs, particularly to track
https://github.com/neondatabase/neon/issues/10283, which I won't get
around to.
2025-04-01 11:38:12 +00:00
Alexey Kondratov
557127550c feat(compute): Add compute_ctl_up metric (#11376)
## Problem

For computes running inside NeonVM, the actual compute image tag is
buried inside the NeonVM spec, and we cannot get it as part of standard
k8s container metrics (it's always an image and a tag of the NeonVM
runner container). The workaround we currently use is to extract the
running computes info from the control plane database with SQL. It has
several drawbacks: i) it's complicated, separate DB per region; ii) it's
slow; iii) it's still an indirect source of info, i.e. k8s state could
be different from what the control plane expects.

## Summary of changes

Add a new `compute_ctl_up` gauge metric with `build_tag` and `status`
labels. It will help us to both overview what are the tags/versions of
all running computes; and to break them down by current status (`empty`,
`running`, `failed`, etc.)

Later, we could introduce low cardinality (no endpoint or compute ids)
streaming aggregates for such metrics, so they will be blazingly fast
and usable for monitoring the fleet-wide state.
2025-04-01 08:51:17 +00:00
Konstantin Knizhnik
cfe3e6d4e1 Remove loop from pageserver_try_receive (#11387)
## Problem

Commit
3da70abfa5
cause noticeable performance regression (40% in update-with-prefetch in
test_bulk_update):
https://neondb.slack.com/archives/C04BLQ4LW7K/p1742633167580879

## Summary of changes

Remove loop from pageserver_try_receive to make it fetch not more than
one response. There is still loop in `pump_prefetch_state` which can
fetch as many responses as available.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-31 19:49:32 +00:00
Alex Chi Z.
47d47000df fix(pageserver): passthrough lsn lease in storcon API (#11386)
## Problem

part of https://github.com/neondatabase/cloud/issues/23667

## Summary of changes

lsn_lease API can only be used on pageservers. This patch enables
storcon passthrough.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-31 19:16:42 +00:00
Matthias van de Meent
e5b95bc9dc Neon LFC/prefetch: Improve page read handling (#11380)
Previously we had different meanings for the bitmask of vector IOps.
That has now been unified to "bit set = final result, no more
scribbling".

Furthermore, the LFC read path scribbled on pages that were already
read; that's probably not a good thing so that's been fixed too. In
passing, the read path of LFC has been updated to read only the
requested pages into the provided buffers, thus reducing the IO size of
vectorized IOs.

## Problem

## Summary of changes
2025-03-31 17:04:00 +00:00
Alex Chi Z.
0ee5bfa2fc fix(pageserver): allow sibling archived branch for detaching (#11383)
## Problem

close https://github.com/neondatabase/neon/issues/11379

## Summary of changes

Remove checks around archived branches for detach v2. I also updated the
comments `ancestor_retain_lsn`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-31 16:32:55 +00:00
Fedor Dikarev
00bcafe82e chore(ci): upgrade stats action with docker images from ghcr.io (#11378)
## Problem
Current version of GitHub Workflow Stats action pull docker images from
DockerHub, that could be an issue with the new pull limits on DockerHub
side.

## Summary of changes
Switch to version `v0.2.2`, with docker images hosted on `ghcr.io`
2025-03-31 14:21:07 +00:00
Conrad Ludgate
ed117af73e chore(proxy/tokio-postgres): remove phf from sqlstate and switch to tracing (#11249)
In sqlstate, we have a manual `phf` construction, which is not
explicitly guaranteed to be stable - you're intended to use a build.rs
or the macro to make sure it's constructed correctly each time. This was
inherited from tokio-postgres upstream, which has the same issue
(https://github.com/rust-phf/rust-phf/pull/321#issuecomment-2724521193).

We don't need this encoding of sqlstate, so I've switched it to simply
parse 5 bytes
(https://www.postgresql.org/docs/current/errcodes-appendix.html).

While here, I switched out log for tracing.
2025-03-31 12:35:51 +00:00
Konstantin Knizhnik
21a891a06d Fix IS_LOCAL_REL macro (first class has oid=FirstNormalObjectId) (#11369)
## Problem

Macro IS_LOCAL_REL used for DEBUG_COMPARE_LOCAL mode use greater-than
rather than greater-or-equal comparison while first table really is
assigned FirstNormalObjectId.

## Summary of changes

Replace strict greater with greater-or-equal comparison.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-31 11:16:35 +00:00
Alexander Bayandin
30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00
Erik Grinaker
db5384e1b0 pageserver: remove L0 flush upload wait (#11196)
## Problem

Previously, L0 flushes would wait for uploads, as a simple form of
backpressure. However, this prevented flush pipelining and upload
parallelism. It has since been disabled by default and replaced by L0
compaction backpressure.

Touches https://github.com/neondatabase/cloud/issues/24664.

## Summary of changes

This patch removes L0 flush upload waits, along with the
`l0_flush_wait_upload`. This can't be merged until the setting has been
removed across the fleet.
2025-03-30 13:14:04 +00:00
JC Grünhage
5cb6a4bc8b fix(ci): use the right sha in release PRs (#11365)
## Problem

`github.sha` contains a merge commit of `head` and `base` if we're in a
PR. In release PRs, this makes no sense, because we fast-forward the
`base` branch to contain the changes from `head`.

Even though we correctly use `${{ github.event.pull_request.head.sha ||
github.sha }}` to reference the git commit when building artifacts, we
don't use that when checking out code, because we want to test the merge
of head and base usually. In the case of release PRs, we definitely
always want to test on the head sha though, because we're going to
forward that, and it already has the base sha as a parent, so the merge
would end up with the same tree anyway.

As a side effect, not checking out `${{
github.event.pull_request.head.sha || github.sha }}` also caused
https://github.com/neondatabase/neon/actions/runs/13986389780/job/39173256184#step:6:49
to say `release-tag=release-compute-8187`, while
https://github.com/neondatabase/neon/actions/runs/14084613121/job/39445314780#step:6:48
is talking about `build-tag=release-compute-8186`

## Summary of changes
Run a few things on `github.event.pull_request.head.sha`, if we're in a
release PR.
2025-03-28 11:56:24 +00:00
Folke Behrens
1dbf40ee2c proxy: Update redis crate (#11372) 2025-03-28 11:43:52 +00:00
Fedor Dikarev
939354abea chore(ci): pin python base images to sha (#11367)
Similar to how we pin base `debian` images, also pin `python` base
images, so we better cache them and have reproducible builds.
2025-03-27 17:42:28 +00:00
Fedor Dikarev
1d5d168626 impr(ci): use hetzner buckets for cache (#11364)
## Problem
Occasionally getting data from GH cache could be slow, with less than
10MB/s and taking 5+ minutes to download cache:
```
Received 20971520 of 2987085791 (0.7%), 9.9 MBs/sec
Received 50331648 of 2987085791 (1.7%), 15.9 MBs/sec
...
Received 1065353216 of 2987085791 (35.7%), 4.8 MBs/sec
Received 1065353216 of 2987085791 (35.7%), 4.7 MBs/sec
...
```

https://github.com/neondatabase/neon/actions/runs/13956437454/job/39068664599#step:7:17

Resulting in getting cache even longer that build time.

## Summary of changes
Switch to the caches, that are closer to the runners, and they provided
stable throughput about 70-80MB/s
2025-03-27 11:11:45 +00:00
Folke Behrens
b40dd54732 compute-node: Add some debugging tools to image (#11352)
## Problem

Some useful debugging tools are missing from the compute image and
sometimes it's impossible to install them because memory is tightly
packed.

## Summary of changes

Add the following tools: iproute2, lsof, screen, tcpdump.
The other changes come from sorting the packages alphabetically.

```bash
$ docker image inspect ghcr.io/neondatabase/vm-compute-node-v16:7555 | jaq '.[0].Size'
1389759645
$ docker image inspect ghcr.io/neondatabase/vm-compute-node-v16:14083125313 | jaq '.[0].Size'
1396051101
$ echo $((1396051101 - 1389759645))
6291456
```
2025-03-27 11:09:27 +00:00
Folke Behrens
4bb7087d4d proxy: Fix some clippy warnings coming in next versions (#11359) 2025-03-26 10:50:16 +00:00
Arpad Müller
5f3551e405 Add "still waiting for task" for slow shutdowns (#11351)
To help with narrowing down
https://github.com/neondatabase/cloud/issues/26362, we make the case
more noisy where we are wait for the shutdown of a specific task (in the
case of that issue, the `gc_loop`).
2025-03-24 17:29:44 +00:00
Anastasia Lubennikova
3e5884ff01 Revert "feat(compute_ctl): allow to change audit_log_level for existi… (#11343)
…ng (#11308)"

This reverts commit e5aef3747c.

The logic of this commit was incorrect:
enabling audit requires a restart of the compute,
because audit extensions use shared_preload_libraries.
So it cannot be done in the configuration phase,
require endpoint restart instead.
2025-03-21 18:09:34 +00:00
Vlad Lazar
9fc7c22cc9 storcon: add use_local_compute_notifications flag (#11333)
## Problem

While working on bulk import, I want to use the `control-plane-url` flag
for a different request.
Currently, the local compute hook is used whenever no control plane is
specified in the config.
My test requires local compute notifications and a configured
`control-plane-url` which isn't supported.

## Summary of changes

Add a `use-local-compute-notifications` flag. When this is set, we use
the local flow regardless of other config values.
It's enabled by default in neon_local and disabled by default in all
other envs. I had to turn the flag off in tests
that wish to bypass the local flow, but that's expected.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-03-21 15:31:06 +00:00
Folke Behrens
23ad228310 pgxn: Increase the pageserver response timeout a bit (#11339)
Increase the PS response timeout slightly but noticeably,
so it does not coincide with the default TCP_RTO_MAX.
2025-03-21 14:21:53 +00:00
Dmitrii Kovalkov
aeb53fea94 storage: support multiple SSL CA certificates (#11341)
## Problem
- We need to support multiple SSL CA certificates for graceful root CA
certificate rotation.
- Closes: https://github.com/neondatabase/cloud/issues/25971

## Summary of changes
- Parses `ssl_ca_file` as a pem bundle, which may contain multiple
certificates. Single pem cert is a valid pem bundle, so the change is
backward compatible.
2025-03-21 13:43:38 +00:00
Dmitrii Kovalkov
0f367cb665 storcon: reuse reqwest http client (#11327)
## Problem

- Part of https://github.com/neondatabase/neon/issues/11113
- Building a new `reqwest::Client` for every request is expensive
because it parses CA certs under the hood. It's noticeable in storcon's
flamegraph.

## Summary of changes
- Reuse one `reqwest::Client` for all API calls to avoid parsing CA
certificates every time.
2025-03-21 11:48:22 +00:00
John Spray
76088c16d2 storcon: reproduce shard split issue (#11290)
## Problem

Issue https://github.com/neondatabase/neon/issues/11254 describes a case
where restart during a shard split can result in a bad end state in the
database.

## Summary of changes

- Add a reproducer for the issue
- Tighten an existing safety check around updated row counts in
complete_shard_split
2025-03-21 08:48:56 +00:00
John Spray
0d99609870 docs: storage controller retro-RFC (#11218)
## Problem

Various aspects of the controller's job are already described in RFCs,
but the overall service didn't have an RFC that records design tradeoffs
and the top level structure.

## Summary of changes

- Add a retrospective RFC that should be useful for anyone understanding
storage controller functionality
2025-03-21 08:32:11 +00:00
Alex Chi Z.
bae9b9acdc feat(pageserver): persist timeline invisible flag (#11331)
## Problem

part of https://github.com/neondatabase/neon/issues/11279

## Summary of changes

The invisible flag is used to exclude a timeline from synthetic size
calculation. For the first step, let's persist this flag. Most of the
code are following the `is_archived` modification flow.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-20 18:39:08 +00:00
Nikita Kalyanov
53f54ba37a chore: expose detach_v2 (#11325)
we need this exposed in the spec to use it in cplane. extracted from
https://github.com/neondatabase/cloud/pull/26167

## Problem

## Summary of changes
2025-03-20 18:04:17 +00:00
Dmitrii Kovalkov
28fc051dcc storage: live ssl certificate reload (#11309)
## Problem
SSL certs are loaded only during start up. It doesn't allow the rotation
of short-lived certificates without server restart.

- Closes: https://github.com/neondatabase/cloud/issues/25525

## Summary of changes
- Implement `ReloadingCertificateResolver` which reloads certificates
from disk periodically.
2025-03-20 16:26:27 +00:00
Folke Behrens
d0102a473a pgxn: Include local port in no-response log messages (#11321)
## Problem

Now that stuck connections are quickly terminated it's not easy to
quickly find the right port from the pid to correlate the connection
with the one seen on pageserver side.

## Summary of changes

Call getsockname() and include the local port number in the
no-response-from-pageserver log messages.
2025-03-20 16:06:00 +00:00
Alex Chi Z.
78502798ae feat(compute_ctl): pass compute type to pageserver with pg_options (#11287)
## Problem

second try of https://github.com/neondatabase/neon/pull/11185, part of
https://github.com/neondatabase/cloud/issues/24706

## Summary of changes

Tristan reminded me of the `options` field of the pg wire protocol,
which can be used to pass configurations. This patch adds the parsing on
the pageserver side, and supplies `neon.endpoint_type` as part of the
`options`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-20 15:48:40 +00:00
Erik Grinaker
65d690b21d storcon: add repeated auto-splits and initial splits (#11122)
## Problem

Currently, we only split tenants into 8 shards once, at the 64 GB split
threshold. For very large tenants, we need to keep splitting to avoid
huge shards. And we also want to eagerly split at a lower threshold to
improve throughput during initial ingestion.

See
https://github.com/neondatabase/cloud/issues/22532#issuecomment-2706215907
for details.

Touches https://github.com/neondatabase/cloud/issues/22532.
Requires #11157.

## Summary of changes

This adds parameters and logic to enable repeated splits when a tenant's
largest timeline divided by shard count exceeds `split_threshold`, as
well as eager initial splits at a lower threshold to speed up initial
ingestion. The default parameters are all set such that they retain the
current behavior in production (only split into 8 shards once, at 64
GB).

* `split_threshold` now specifies a maximum shard size. When a shard
exceeds it, all tenant shards are split by powers of 2 such that all
tenant shards fall below `split_threshold`. Disabled by default, like
today.
* Add `max_split_shards` to specify a max shard count for autosplits.
Defaults to 8 to retain current behavior.
* Add `initial_split_threshold` and `initial_split_shards` to specify a
threshold and target count for eager splits of unsharded tenants.
Defaults to 64 GB and 8 shards to retain current production behavior.

Because this PR sets `initial_split_threshold` to 64 GB by default, it
has the effect of enabling autosplits by default. This was not the case
previously, since `split_threshold` defaults to None, but it is already
enabled across production and staging. This is temporary until we
complete the production rollout.

For more details, see code comments.

This must wait until #11157 has been deployed to Pageservers.

Once this has been deployed to production, we plan to change the
parameters to:

* `split-threshold`: 256 GB
* `initial-split-threshold`: 16 GB
* `initial-split-shards`: 4
* `max-split-shards`: 16

The final split points will thus be:

* Start: 1 shard
* 16 GB: 4 shards
* 1 TB: 8 shards
* 2 TB: 16 shards

We will then change the default settings to be disabled by default.

---------

Co-authored-by: John Spray <john@neon.tech>
2025-03-20 15:43:57 +00:00
Arpad Müller
5dd60933d3 Mark min_readable_lsn as required in the pageserver API spec (#11324)
Fully applies the changes of
https://github.com/neondatabase/cloud/pull/25233 to neon.git. The field
is always present in the Rust struct definition, so it can be marked as
required.

cc #10707
2025-03-20 15:25:09 +00:00
Gleb Novikov
2065074559 fast_import: put job status to s3 (#11284)
## Problem

`fast_import` binary is being run inside neonvms, and they do not
support proper `kubectl describe logs` now, there are a bunch of other
caveats as well: https://github.com/neondatabase/autoscaling/issues/1320

Anyway, we needed a signal if job finished successfully or not, and if
not — at least some error message for the cplane operation. And after [a
short
discussion](https://neondb.slack.com/archives/C07PG8J1L0P/p1741954251813609),
that s3 object is the most convenient at the moment.

## Summary of changes

If `s3_prefix` was provided to `fast_import` call, any job run puts a
status object file into `{s3_prefix}/status/fast_import` with contents
`{"done": true}` or `{"done": false, "error": "..."}`. Added a test as
well
2025-03-20 15:23:35 +00:00
Konstantin Knizhnik
3da70abfa5 Fix pageserver_try_receive (#11096)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1741176713523469

The problem is that this function is using `PQgetCopyData(shard->conn,
&resp_buff.data, 1 /* async = true */)`
to try to fetch next message. But this function returns 0 if the whole
message is not present in the buffer.
And input buffer may contain only part of message so result is not
fetched.

## Summary of changes

Use `PQisBusy` + `WaitEventSetWait` to check if data is available and
`PQgetCopyData(shard->conn, &resp_buff.data, 0)` to read whole message
in this case.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-20 15:21:00 +00:00
Anastasia Lubennikova
e5aef3747c feat(compute_ctl): allow to change audit_log_level for existing (#11308)
projects.

Preserve the information about the current audit log level in compute
state, so that we don't relaunch rsyslog on every spec change

https://github.com/neondatabase/cloud/issues/25349

---------

Co-authored-by: Tristan Partin <tristan@neon.tech>
2025-03-20 11:23:20 +00:00
Arpad Müller
91dad2514f storcon: also support tenant deletion for safekeepers (#11289)
If a tenant gets deleted, delete also all of its timelines. We assume
that by the time a tenant is being deleted, no new timelines are being
created, so we don't need to worry about races with creation in this
situation.

Unlike #11233, which was very simple because it listed the timelines and
invoked timeline deletion, this PR obtains a list of safekeepers to
invoke the tenant deletion on, and then invokes tenant deletion on each
safekeeper that has one or multiple timelines.

Alternative to #11233
Builds on #11288
Part of #9011
2025-03-20 10:52:21 +00:00
Dmitrii Kovalkov
9bf59989db storcon: add https API (#11239)
## Problem

Pageservers use unencrypted HTTP requests for storage controller API.

- Closes: https://github.com/neondatabase/cloud/issues/25524

## Summary of changes

- Replace hyper0::server::Server with http_utils::server::Server in
storage controller.
- Add HTTPS handler for storage controller API.
- Support `ssl_ca_file` in pageserver.
2025-03-20 08:22:02 +00:00
290 changed files with 5795 additions and 3884 deletions

View File

@@ -8,6 +8,7 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AWS_ECR_REGION
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
@@ -15,23 +16,25 @@ config-variables:
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_INGEST_TARGET_PROJECTID
- BENCHMARK_LARGE_OLTP_PROJECTID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN
- BENCHMARK_INGEST_TARGET_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- SLACK_ON_CALL_QA_STAGING_STREAM
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION
- BENCHMARK_LARGE_OLTP_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -128,29 +128,49 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}

View File

@@ -37,8 +37,14 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
- name: Cache poetry deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}

View File

@@ -48,8 +48,13 @@ jobs:
submodules: true
- name: Cache cargo deps
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src

View File

@@ -5,6 +5,9 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
outputs:
build-tag:
description: "Tag for the current workflow run"
@@ -27,6 +30,9 @@ on:
release-pr-run-id:
description: "Only available if `run-kind in [storage-release, proxy-release, compute-release]`. Contains the run ID of the `Build and Test` workflow, assuming one with the current commit can be found."
value: ${{ jobs.tags.outputs.release-pr-run-id }}
sha:
description: "github.event.pull_request.head.sha on release PRs, github.sha otherwise"
value: ${{ jobs.tags.outputs.sha }}
permissions: {}
@@ -45,6 +51,7 @@ jobs:
storage: ${{ steps.previous-releases.outputs.storage }}
run-kind: ${{ steps.run-kind.outputs.run-kind }}
release-pr-run-id: ${{ steps.release-pr-run-id.outputs.release-pr-run-id }}
sha: ${{ steps.sha.outputs.sha }}
permissions:
contents: read
steps:
@@ -54,10 +61,6 @@ jobs:
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get run kind
id: run-kind
env:
@@ -78,6 +81,23 @@ jobs:
run: |
echo "run-kind=$RUN_KIND" | tee -a $GITHUB_OUTPUT
- name: Get the right SHA
id: sha
env:
SHA: >
${{
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), steps.run-kind.outputs.run-kind)
&& fromJSON(inputs.github-event-json).pull_request.head.sha
|| github.sha
}}
run: |
echo "sha=$SHA" | tee -a $GITHUB_OUTPUT
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
ref: ${{ steps.sha.outputs.sha }}
- name: Get build tag
id: build-tag
env:
@@ -143,7 +163,7 @@ jobs:
if: ${{ contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), steps.run-kind.outputs.run-kind) }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
CURRENT_SHA: ${{ github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -63,8 +63,13 @@ jobs:
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -129,15 +134,25 @@ jobs:
- name: Cache postgres v17 build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -203,32 +218,57 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache cargo deps (only for v17)
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src
@@ -238,8 +278,13 @@ jobs:
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}

View File

@@ -80,6 +80,7 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
build-build-tools-image:
needs: [ check-permissions ]
@@ -248,8 +249,13 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cache poetry deps
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
@@ -540,6 +546,7 @@ jobs:
uses: ./.github/workflows/trigger-e2e-tests.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
secrets: inherit
neon-image-arch:
@@ -563,6 +570,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0
@@ -672,6 +680,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0

View File

@@ -23,7 +23,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 2 hours
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -43,7 +43,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 48 hours
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -63,7 +63,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 30 days
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"

View File

@@ -9,6 +9,9 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
defaults:
run:
@@ -48,6 +51,7 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ inputs.github-event-name || github.event_name }}
github-event-json: ${{ inputs.github-event-json || toJSON(github.event) }}
trigger-e2e-tests:
needs: [ meta ]

62
Cargo.lock generated
View File

@@ -148,9 +148,9 @@ dependencies = [
[[package]]
name = "arc-swap"
version = "1.6.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "archery"
@@ -1167,18 +1167,6 @@ dependencies = [
"half",
]
[[package]]
name = "cicc"
version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"glob",
"serde",
"toml",
"url",
]
[[package]]
name = "clang-sys"
version = "1.6.1"
@@ -2510,9 +2498,9 @@ dependencies = [
[[package]]
name = "glob"
version = "0.3.2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
@@ -2821,6 +2809,7 @@ name = "http-utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"bytes",
"camino",
"fail",
@@ -2833,6 +2822,7 @@ dependencies = [
"pprof",
"regex",
"routerify",
"rustls 0.23.18",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
@@ -3871,11 +3861,10 @@ dependencies = [
[[package]]
name = "num-bigint"
version = "0.4.3"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
@@ -3924,11 +3913,10 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.45"
version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
dependencies = [
"autocfg",
"num-traits",
]
@@ -3957,9 +3945,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.15"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
"libm",
@@ -4704,7 +4692,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
dependencies = [
"base64 0.22.1",
"byteorder",
@@ -4738,7 +4726,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
dependencies = [
"bytes",
"chrono",
@@ -5372,26 +5360,25 @@ dependencies = [
[[package]]
name = "redis"
version = "0.25.2"
version = "0.29.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d64e978fd98a0e6b105d066ba4889a7301fca65aeac850a877d8797343feeb"
checksum = "b110459d6e323b7cda23980c46c77157601199c9da6241552b284cd565a7a133"
dependencies = [
"async-trait",
"arc-swap",
"bytes",
"combine",
"futures-util",
"itoa",
"num-bigint",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-rustls 0.25.0",
"tokio-rustls 0.26.0",
"tokio-util",
"url",
]
@@ -6617,6 +6604,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"camino",
"chrono",
"clap",
"clashmap",
@@ -6660,6 +6648,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-util",
"tracing",
"utils",
@@ -7182,7 +7171,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.10"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
dependencies = [
"async-trait",
"byteorder",
@@ -7225,15 +7214,14 @@ dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"parking_lot 0.12.1",
"phf",
"pin-project-lite",
"postgres-protocol2",
"postgres-types2",
"serde",
"tokio",
"tokio-util",
"tracing",
]
[[package]]

View File

@@ -1,7 +1,6 @@
[workspace]
resolver = "2"
members = [
"cicc",
"compute_tools",
"control_plane",
"control_plane/storcon_cli",
@@ -51,7 +50,7 @@ license = "Apache-2.0"
[workspace.dependencies]
ahash = "0.8"
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
arc-swap = "1.7"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
flate2 = "1.0.26"
@@ -96,7 +95,6 @@ futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
glob = "0.3.2"
governor = "0.8"
hashbrown = "0.14"
hashlink = "0.9.1"
@@ -132,7 +130,7 @@ nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal"
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
@@ -148,7 +146,7 @@ procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
@@ -212,7 +210,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["s
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = { version = "2.2", features = ["serde"] }
url = "2.2"
urlencoding = "2.1"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"

View File

@@ -1,12 +0,0 @@
[package]
name = "cicc"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
clap.workspace = true
glob.workspace = true
serde.workspace = true
toml.workspace = true
url.workspace = true

View File

@@ -1,22 +0,0 @@
{
"$id": "extension",
"properties": {
"name": {
"type": "string"
},
"version": {
"type": "string"
},
"build-system": {
"enum": [
"pgxs"
]
},
"tarball": {
"type": "string"
},
"checksum": {
"type": "string"
}
}
}

View File

@@ -1,145 +0,0 @@
use std::{io::Write, path::PathBuf};
use anyhow::Result;
use clap::Parser;
use glob::glob;
use serde::Deserialize;
#[derive(Debug, Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
directory: PathBuf,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
enum BuildSystem {
#[serde(rename = "pgxs")]
PGXS,
#[serde(rename = "cmake-ninja")]
CmakeNinja,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
struct ExtensionManifest {
pub name: String,
pub version: String,
pub build_system: BuildSystem,
#[serde(default)]
pub build_arguments: Vec<String>,
pub tarball: String,
pub checksum: String,
pub trusted: bool,
}
impl TryFrom<PathBuf> for ExtensionManifest {
type Error = anyhow::Error;
fn try_from(path: PathBuf) -> std::result::Result<Self, Self::Error> {
let content = std::fs::read_to_string(path)?;
let manifest: ExtensionManifest = toml::from_str(&content)?;
Ok(manifest)
}
}
impl ExtensionManifest {
pub fn compile(self) {
let mut stdout = std::io::stdout().lock();
writeln!(
&mut stdout,
r"FROM build-deps AS {name}-src
ARG PG_VERSION
WORKDIR /ext-src
RUN wget '{tarball}' -O '{name}.tar.gz' && \
echo '{checksum} {name}.tar.gz' | sha256sum --check && \
mkdir '{name}-src' && cd '{name}-src' && tar xzf '../{name}.tar.gz' --strip-components=1 -C .
FROM pg-build AS {name}-build
COPY --from={name}-src /ext-src/ /ext-src/
WORKDIR /ext-src/{name}-src
",
name = self.name,
tarball = self.tarball,
checksum = self.checksum
)
.unwrap();
match self.build_system {
BuildSystem::PGXS => writeln!(
&mut stdout,
r#"RUN make -j "$(getconf _NPROCESSORS_ONLN)" install && \"#,
)
.unwrap(),
BuildSystem::CmakeNinja => writeln!(
&mut stdout,
r"RUN cmake -G Ninja -B build -DCMAKE_BUILD_TYPE=Release {build_args}
ninja -C install && \",
build_args = self.build_arguments.join(" ")
)
.unwrap(),
}
if self.trusted {
writeln!(
&mut stdout,
" echo 'trusted = true' >> '/usr/local/pgsql/share/extension/{name}.control",
name = self.name
)
.unwrap();
} else {
writeln!(&mut stdout, " true").unwrap();
}
write!(&mut stdout, "\n").unwrap();
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
struct DependencyManifest {}
fn main() -> Result<()> {
let cli = Cli::parse();
if !cli.directory.exists() {
eprintln!("Directory does not exist");
std::process::exit(1);
}
for entry in glob(cli.directory.join("*.toml").to_str().unwrap()).unwrap() {
let path = match entry {
Ok(entry) => entry,
Err(_) => continue,
};
let manifest: ExtensionManifest = match path.clone().try_into() {
Ok(manifest) => manifest,
Err(e) => {
eprintln!("Failed to read {}: {}", path.display(), e);
std::process::exit(1);
}
};
manifest.compile();
}
Ok(())
}
#[cfg(test)]
mod test {
use clap::CommandFactory;
use super::Cli;
#[test]
fn verify_cli() {
Cli::command().debug_assert()
}
}

View File

@@ -1916,26 +1916,30 @@ RUN apt update && \
;; \
esac && \
apt install --no-install-recommends -y \
ca-certificates \
gdb \
liblz4-1 \
libreadline8 \
iproute2 \
libboost-iostreams1.74.0 \
libboost-regex1.74.0 \
libboost-serialization1.74.0 \
libboost-system1.74.0 \
libossp-uuid16 \
libcurl4 \
libevent-2.1-7 \
libgeos-c1v5 \
liblz4-1 \
libossp-uuid16 \
libprotobuf-c1 \
libreadline8 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4 \
libevent-2.1-7 \
locales \
lsof \
procps \
ca-certificates \
rsyslog \
screen \
tcpdump \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -1,20 +0,0 @@
name: ip4r
build-system: "pgxs"
trusted: true
pg14: &pg14
version: "2.10.1"
tarball: "https://github.com/timescale/timescaledb/archive/refs/tags/{version}.tar.gz"
checksum: 6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73
pg15:
<<: *pg14
pg16:
<<: *pg14
pg17:
<<: *pg14

View File

@@ -1,9 +0,0 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -1,9 +0,0 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -1,9 +0,0 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -1,9 +0,0 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -1,14 +0,0 @@
name = "timescaledb"
version = "2.17.1"
trusted = true
build-system = "cmake-ninja"
build-arguments = [
"-DSEND_TELEMETRY_DEFAULT:BOOL=OFF",
"-DUSE_TELEMETRY:BOOL=OFF",
"-DAPACHE_ONLY:BOOL=ON",
]
tarball = "https://github.com/timescale/timescaledb/archive/refs/tags/2.17.1.tar.gz"
checksum = "6277cf43f5695e23dae1c5cfeba00474d730b66ed53665a84b787a6bb1a57e28"

View File

@@ -45,7 +45,9 @@ use anyhow::{Context, Result};
use clap::Parser;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
@@ -57,10 +59,6 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
@@ -147,7 +145,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let build_tag = runtime.block_on(init())?;
runtime.block_on(init())?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -174,8 +172,6 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
build_tag,
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
@@ -189,7 +185,7 @@ fn main() -> Result<()> {
deinit_and_exit(exit_code);
}
async fn init() -> Result<String> {
async fn init() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
@@ -199,12 +195,9 @@ async fn init() -> Result<String> {
}
});
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
info!("build_tag: {build_tag}");
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok(build_tag)
Ok(())
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {

View File

@@ -31,6 +31,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version};
use nix::unistd::Pid;
use std::ops::Not;
use tracing::{Instrument, error, info, info_span, warn};
use utils::fs_ext::is_directory_empty;
@@ -44,7 +45,7 @@ mod s3_uri;
const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
#[derive(Subcommand, Debug)]
#[derive(Subcommand, Debug, Clone, serde::Serialize)]
enum Command {
/// Runs local postgres (neon binary), restores into it,
/// uploads pgdata to s3 to be consumed by pageservers
@@ -84,6 +85,15 @@ enum Command {
},
}
impl Command {
fn as_str(&self) -> &'static str {
match self {
Command::Pgdata { .. } => "pgdata",
Command::DumpRestore { .. } => "dump-restore",
}
}
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, env = "NEON_IMPORTER_WORKDIR")]
@@ -437,7 +447,7 @@ async fn run_dump_restore(
#[allow(clippy::too_many_arguments)]
async fn cmd_pgdata(
s3_client: Option<aws_sdk_s3::Client>,
s3_client: Option<&aws_sdk_s3::Client>,
kms_client: Option<aws_sdk_kms::Client>,
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
@@ -506,14 +516,14 @@ async fn cmd_pgdata(
if let Some(s3_prefix) = maybe_s3_prefix {
info!("upload pgdata");
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
s3_client.unwrap(),
Utf8Path::new(&pgdata_dir),
&s3_prefix.append("/pgdata/"),
)
.await
.context("sync dump directory to destination")?;
info!("write status");
info!("write pgdata status to s3");
{
let status_dir = workdir.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
@@ -550,13 +560,15 @@ async fn cmd_dumprestore(
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?;
.await
.context("decrypt source connection string")?;
let dest = if let Some(dest_ciphertext) =
spec.destination_connstring_ciphertext_base64
{
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await?
.await
.context("decrypt destination connection string")?
} else {
bail!(
"destination connection string must be provided in spec for dump_restore command"
@@ -601,7 +613,18 @@ pub(crate) async fn main() -> anyhow::Result<()> {
// Initialize AWS clients only if s3_prefix is specified
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
// Create AWS config with enhanced retry settings
let config = aws_config::defaults(BehaviorVersion::v2024_03_28())
.retry_config(
aws_config::retry::RetryConfig::standard()
.with_max_attempts(5) // Retry up to 5 times
.with_initial_backoff(std::time::Duration::from_millis(200)) // Start with 200ms delay
.with_max_backoff(std::time::Duration::from_secs(5)), // Cap at 5 seconds
)
.load()
.await;
// Create clients from the config with enhanced retry settings
let s3_client = aws_sdk_s3::Client::new(&config);
let kms = aws_sdk_kms::Client::new(&config);
(Some(s3_client), Some(kms))
@@ -609,79 +632,108 @@ pub(crate) async fn main() -> anyhow::Result<()> {
(None, None)
};
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
// Capture everything from spec assignment onwards to handle errors
let res = async {
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
match args.command {
Command::Pgdata {
source_connection_string,
interactive,
pg_port,
num_cpus,
memory_mb,
} => {
cmd_pgdata(
s3_client,
kms_client,
args.s3_prefix,
spec,
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
match args.command.clone() {
Command::Pgdata {
source_connection_string,
interactive,
pg_port,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
)
.await?;
}
Command::DumpRestore {
source_connection_string,
destination_connection_string,
} => {
cmd_dumprestore(
kms_client,
spec,
} => {
cmd_pgdata(
s3_client.as_ref(),
kms_client,
args.s3_prefix.clone(),
spec,
source_connection_string,
interactive,
pg_port,
args.working_directory.clone(),
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
)
.await
}
Command::DumpRestore {
source_connection_string,
destination_connection_string,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
args.working_directory.clone(),
args.pg_bin_dir,
args.pg_lib_dir,
)
.await
}
}
}
.await;
if let Some(s3_prefix) = args.s3_prefix {
info!("write job status to s3");
{
let status_dir = args.working_directory.join("status");
if std::fs::exists(&status_dir)?.not() {
std::fs::create_dir(&status_dir).context("create status directory")?;
}
let status_file = status_dir.join("fast_import");
let res_obj = match res {
Ok(_) => serde_json::json!({"command": args.command.as_str(), "done": true}),
Err(err) => {
serde_json::json!({"command": args.command.as_str(), "done": false, "error": err.to_string()})
}
};
std::fs::write(&status_file, res_obj.to_string()).context("write status file")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
&status_dir,
&s3_prefix.append("/status/"),
)
.await?;
.await
.context("sync status directory to destination")?;
}
}

View File

@@ -20,6 +20,7 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
@@ -35,6 +36,7 @@ use crate::disk_quota::set_disk_quota;
use crate::installed_extensions::get_installed_extensions;
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::rsyslog::{
@@ -49,6 +51,17 @@ use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
// This is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
/// Build tag/version of the compute node binaries/image. It's tricky and ugly
/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a
/// global static variable.
pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string()
});
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
@@ -72,7 +85,6 @@ pub struct ComputeNodeParams {
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
pub build_tag: String,
/// The port that the compute's external HTTP server listens on
pub external_http_port: u16,
@@ -173,6 +185,11 @@ impl ComputeState {
info!("Changing compute status from {} to {}", prev, status);
self.status = status;
state_changed.notify_all();
COMPUTE_CTL_UP.reset();
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
.set(1);
}
pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) {
@@ -352,13 +369,19 @@ impl ComputeNode {
}
.launch(&this);
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
// The internal HTTP server is needed for a further activation by control plane
// if compute was started for a pool, so we have to start server before hanging
// waiting for a spec.
crate::http::server::Server::Internal {
port: this.params.internal_http_port,
}
.launch(&this);
// HTTP server is running, so we can officially declare compute_ctl as 'up'
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
.set(1);
// If we got a spec from the CLI already, use that. Otherwise wait for the
// control plane to pass it to us with a /configure HTTP request
let pspec = if let Some(cli_spec) = cli_spec {
@@ -878,6 +901,14 @@ impl ComputeNode {
info!("Storage auth token not set");
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
@@ -2024,12 +2055,8 @@ LIMIT 100",
let mut download_tasks = Vec::new();
for library in &libs_vec {
let (ext_name, ext_path) = remote_extensions.get_ext(
library,
true,
&self.params.build_tag,
&self.params.pgversion,
)?;
let (ext_name, ext_path) =
remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?;
download_tasks.push(self.download_extension(ext_name, ext_path));
}
let results = join_all(download_tasks).await;

View File

@@ -117,6 +117,7 @@ pub fn write_postgres_conf(
writeln!(file, "lc_numeric='C.UTF-8'")?;
}
writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {

View File

@@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::Deserialize;
use crate::compute::ComputeNode;
use crate::compute::{BUILD_TAG, ComputeNode};
use crate::http::JsonResponse;
use crate::http::extract::{Path, Query};
@@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension(
remote_extensions.get_ext(
&filename,
ext_server_params.is_library,
&compute.params.build_tag,
&BUILD_TAG,
&compute.params.pgversion,
)
};

View File

@@ -1,7 +1,8 @@
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -70,8 +71,19 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy<GenericGauge<AtomicF64>> = Lazy::new(
.expect("failed to define a metric")
});
// Report that `compute_ctl` is up and what's the current compute status.
pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"compute_ctl_up",
"Whether compute_ctl is running",
&["build_tag", "status"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());

View File

@@ -165,8 +165,11 @@ pub struct NeonStorageControllerConf {
/// Database url used when running multiple storage controller instances
pub database_url: Option<SocketAddr>,
/// Threshold for auto-splitting a tenant into shards
/// Thresholds for auto-splitting a tenant into shards.
pub split_threshold: Option<u64>,
pub max_split_shards: Option<u8>,
pub initial_split_threshold: Option<u64>,
pub initial_split_shards: Option<u8>,
pub max_secondary_lag_bytes: Option<u64>,
@@ -181,6 +184,8 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -201,12 +206,16 @@ impl Default for NeonStorageControllerConf {
start_as_candidate: false,
database_url: None,
split_threshold: None,
max_split_shards: None,
initial_split_threshold: None,
initial_split_shards: None,
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -51,11 +51,19 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem(&buf).expect("CA certificate should be valid")
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -72,6 +80,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
endpoint,
{
match conf.http_auth_type {
@@ -83,9 +92,7 @@ impl PageServerNode {
}
}
.as_deref(),
ssl_ca_cert,
)
.expect("Client constructs with no errors"),
),
}
}
@@ -142,6 +149,10 @@ impl PageServerNode {
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
overrides.push(format!("ssl_ca_file='{}'", ssl_ca_file.to_str().unwrap()));
}
// Apply the user-provided overrides
overrides.push({
let mut doc =
@@ -417,11 +428,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'l0_flush_delay_threshold' as an integer")?,
l0_flush_wait_upload: settings
.remove("l0_flush_wait_upload")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'l0_flush_wait_upload' as a boolean")?,
l0_flush_stall_threshold: settings
.remove("l0_flush_stall_threshold")
.map(|x| x.parse::<usize>())

View File

@@ -1,6 +1,5 @@
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
@@ -18,7 +17,7 @@ use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, Timelin
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use reqwest::{Certificate, Method};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -38,9 +37,9 @@ pub struct StorageController {
client: reqwest::Client,
config: NeonStorageControllerConf,
// The listen addresses is learned when starting the storage controller,
// The listen port is learned when starting the storage controller,
// hence the use of OnceLock to init it at the right time.
listen: OnceLock<SocketAddr>,
listen_port: OnceLock<u16>,
}
const COMMAND: &str = "storage_controller";
@@ -144,15 +143,26 @@ impl StorageController {
}
};
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("HTTP client should construct with no error");
Self {
env: env.clone(),
private_key,
public_key,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
client: http_client,
config: env.storage_controller.clone(),
listen: OnceLock::default(),
listen_port: OnceLock::default(),
}
}
@@ -337,34 +347,34 @@ impl StorageController {
}
}
let (listen, postgres_port) = {
if let Some(base_port) = start_args.base_port {
(
format!("127.0.0.1:{base_port}"),
self.config
.database_url
.expect("--base-port requires NeonStorageControllerConf::database_url")
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone();
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&instance_dir.join("server.crt"),
&instance_dir.join("server.key"),
)?;
}
let listen = format!(
"{}:{}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
);
let listen_url = &self.env.control_plane_api;
(listen, listen_url.port().unwrap() + 1)
}
let scheme = listen_url.scheme();
let host = listen_url.host_str().unwrap();
let (listen_port, postgres_port) = if let Some(base_port) = start_args.base_port {
(
base_port,
self.config
.database_url
.expect("--base-port requires NeonStorageControllerConf::database_url")
.port(),
)
} else {
let port = listen_url.port().unwrap();
(port, port + 1)
};
let socket_addr = listen
.parse()
.expect("listen address is a valid socket address");
self.listen
.set(socket_addr)
.expect("StorageController::listen is only set here");
self.listen_port
.set(listen_port)
.expect("StorageController::listen_port is only set here");
// Do we remove the pid file on stop?
let pg_started = self.is_postgres_running().await?;
@@ -500,20 +510,15 @@ impl StorageController {
drop(client);
conn.await??;
let listen = self
.listen
.get()
.expect("cell is set earlier in this function");
let addr = format!("{}:{}", host, listen_port);
let address_for_peers = Uri::builder()
.scheme("http")
.authority(format!("{}:{}", listen.ip(), listen.port()))
.scheme(scheme)
.authority(addr.clone())
.path_and_query("")
.build()
.unwrap();
let mut args = vec![
"-l",
&listen.to_string(),
"--dev",
"--database-url",
&database_url,
@@ -530,6 +535,14 @@ impl StorageController {
.map(|s| s.to_string())
.collect::<Vec<_>>();
match scheme {
"http" => args.extend(["--listen".to_string(), addr]),
"https" => args.extend(["--listen-https".to_string(), addr]),
_ => {
panic!("Unexpected url scheme in control_plane_api: {scheme}");
}
}
if self.config.start_as_candidate {
args.push("--start-as-candidate".to_string());
}
@@ -542,6 +555,10 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
@@ -570,6 +587,20 @@ impl StorageController {
args.push(format!("--split-threshold={split_threshold}"))
}
if let Some(max_split_shards) = self.config.max_split_shards.as_ref() {
args.push(format!("--max-split-shards={max_split_shards}"))
}
if let Some(initial_split_threshold) = self.config.initial_split_threshold.as_ref() {
args.push(format!(
"--initial-split-threshold={initial_split_threshold}"
))
}
if let Some(initial_split_shards) = self.config.initial_split_shards.as_ref() {
args.push(format!("--initial-split-shards={initial_split_shards}"))
}
if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
args.push(format!("--max-secondary-lag-bytes={lag}"))
}
@@ -590,6 +621,8 @@ impl StorageController {
args.push("--timelines-onto-safekeepers".to_string());
}
println!("Starting storage controller");
background_process::start_process(
COMMAND,
&instance_dir,
@@ -716,30 +749,26 @@ impl StorageController {
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
// to pass the readiness check. In this scenario [`Self::listen`] will be set
// (see [`Self::start`]).
// to pass the readiness check. In this scenario [`Self::listen_port`] will
// be set (see [`Self::start`]).
//
// Otherwise, we infer the storage controller api endpoint from the configured
// control plane API.
let url = if let Some(socket_addr) = self.listen.get() {
Url::from_str(&format!(
"http://{}:{}/{path}",
socket_addr.ip().to_canonical(),
socket_addr.port()
))
.unwrap()
let port = if let Some(port) = self.listen_port.get() {
*port
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
))
.unwrap()
self.env.control_plane_api.port().unwrap()
};
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let url = Url::from_str(&format!(
"{}://{}:{port}/{path}",
self.env.control_plane_api.scheme(),
self.env.control_plane_api.host_str().unwrap(),
))
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {
builder = builder.json(&body)

View File

@@ -20,7 +20,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use reqwest::{Certificate, Method, StatusCode, Url};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -274,7 +274,7 @@ struct Cli {
jwt: Option<String>,
#[arg(long)]
/// Trusted root CA certificate to use in https APIs.
/// Trusted root CA certificates to use in https APIs.
ssl_ca_file: Option<PathBuf>,
#[command(subcommand)]
@@ -387,17 +387,23 @@ async fn main() -> anyhow::Result<()> {
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
let ssl_ca_cert = match &cli.ssl_ca_file {
let ssl_ca_certs = match &cli.ssl_ca_file {
Some(ssl_ca_file) => {
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(reqwest::Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
};
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;
let mut trimmed = cli.api.to_string();
trimmed.pop();
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
match cli.command {
Command::NodeRegister {

View File

@@ -0,0 +1,196 @@
## Summary
This is a retrospective RFC to document the design of the `storage-controller` service.
This service manages the physical mapping of Tenants and Timelines to Pageservers and Safekeepers. It
acts as the API for "storage" as an abstract concept: enabling other parts of the system to reason
about things like creating/deleting tenants and timelines without having to understand exactly which
pageserver and safekeeper to communicate, or any subtle rules about how to orchestrate these things.
The storage controller was implemented in the first half of 2024 as an essential part
of storage sharding, especially [shard splitting](032-shard-splitting.md).
It initially managed only pageservers, but has extended in 2025 to also manage safekeepers. In
some places you may seen unqualified references to 'nodes' -- those are pageservers.
## Design Choices
### Durability
We rely on an external postgres for all durable state. No local storage is used.
We avoid any unnecessary I/O to durable storage. For example:
- most tracking of in-flight changes to the system is done in-memory rather than recording progress/steps in a database
- When migrating tenant shards between pageservers we only touch the database to increment generation numbers,
we do not persist the total state of a tenant shard.
Being frugal with database I/O has two benefits:
- It avoids the database becoming a practical scaling bottleneck (we expect in-memory scale issues to be hit
before we hit e.g. transactions-per-second issues)
- It reduces cost when using a cloud database service to run the controller's postgres database.
The trade-off is that there is a "bootstrapping" problem: a controller can't be deployed in isolation, one
must first have some existing database system. In practice, we expect that Neon is deployed in one of the
following ways:
- into a cloud which has a postgres service that can be used to run the controller
- into a mature on-prem environment that has existing facilities for running databases
- into a test/dev environment where a simple one-node vanilla postgres installation is sufficient
### Consensus
The controller does _not_ implement any strong consensus mechanism of its own. Instead:
- Where strong consistency is required (for example, for pageserver generation numbers), this
responsibility is delegated to a transaction in our postgres database.
- Highly available deploys are done using a simple in-database record of what controller instances
are available, distinguished by timestamps, rather than having controllers directly negotiate a leader.
Avoiding strong consensus among controller processes is a cost saving (we avoid running three controllers
all the time), and simplifies implementation (we do not have to phrase all configuration changes as e.g raft
transactions).
The trade-off is that under some circumstances a controller with partial network isolation can cause availability
issues in the cluster, by making changes to pageserver state that might disagree with what the "true" active
controller is trying to do. The impact of this is bounded by our `controllers` database table, that enables
a rogue node to eventually realise that it is not the leader and step down. If a rogue node can't reach
the database, then it implicitly stops making progress. A rogue controller cannot durably damage the system
because pageserver data and safekeeper configs are protected by generation numbers that are only updated
via postgres transactions (i.e. no controller "trusts itself" to independently make decisions about generations).
### Scale
We design for high but not unlimited scale. The memory footprint of each tenant shard is small (~8kB), so
it is realistic to scale up to a million attached shards on a server with modest resources. Tenants in
a detached state (i.e. not active on pageservers) do not need to be managed by storage controller, and can
be relegated from memory to the database.
Typically, a tenant shard is updated about once a week, when we do a deploy. During deploys, we relocate
a few thousand tenants from each pageserver while it is restarted, so it is extremely rare for the controller
to have to do O(N) work (on all shards at once).
There are places where we do O(N) work:
- On normal startup, when loading from the database into memory
- On unclean startup (with no handover of observed state from a previous controller), where we will
scan all shards on all pageservers.
It is important that these locations are written efficiently. At high scale we should still expect runtimes
of the order tens of seconds to complete a storage controller start.
When the practical scale limit of a single storage controller is reached, just deploy another one with its
own pageservers & safekeepers: each controller+its storage servers should be thought of as a logical cluster
or "cell" of storage.
# High Level Design
The storage controller is an in-memory system (i.e. state for all attached
tenants is held in memory _as well as_ being represented in durable postgres storage).
## Infrastructure
The storage controller is an async rust binary using tokio.
The storage controller is built around the `Service` type. This implements
all the entry points for the outside world's interaction with the controller (HTTP handlers are mostly thin wrappers of service functions),
and holds most in-memory state (e.g. the list of tenant shards).
The state is held in a `ServiceInner` wrapped in a RwLock. This monolithic
lock is used to simplify reasoning about code that mutates state: each function that takes a write lock may be thought of as a serializable transaction on the in-memory state. This lock is clearly a bottleneck, but
nevertheless is scalable to managing millions of tenants.
Persistent state is held in a postgres database, and we use the `diesel` crate to provide database client functionality. All database access is wrapped in the `Persistence` type -- this makes it easy to understand which
code is touching the database. The database is only used when necessary, i.e. for state that cannot be recovered another way. For example, we do not store the secondary pageserver locations of tenant shards in the database, rather we learn these at startup from running pageservers, and/or make scheduling decisions to fill in the gaps. This adds some complexity, but massively reduces the load on the database, and enables running the storage controller with a very cheap postgres instance.
## Pageserver tenant scheduling & reconciliation
### Intent & observed state
Each tenant shard is represented by type `TenantShard`, which has an 'intent' and 'observed' state. Setting the
intent state is called _scheduling_, and doing remote I/O to make observed
state match intent state is called _reconciliation_.
The `Scheduler` type is responsible for making choices about the intent
state, such as choosing a pageserver for a new tenant shard, or assigning
a replacement pageserver when the original one fails.
The observed state is updated after tenant reconciliation (see below), and
has the concept of a `None` state for a pageserver, indicating unknown state. This is used to ensure that we can safely clean up after we start
but do not finish a remote call to a pageserver, or if a pageserver restarts and we are uncertain of its state.
### Tenant Reconciliation
The `Reconciler` type is responsible for updating pageservers to achieve
the intent state. It is instantiated when `Service` determines that a shard requires reconciliation, and owned by a background tokio task that
runs it to completion. Reconciler does not have access to the `Service` state: it is populated with a snapshot of relevant information when constructed, and submits is results to a channel that `Service` consumes
to update the tenant shard's observed state.
The Reconciler does have access to the database, but only uses it for
a single purpose: updating shards' generation numbers immediately before
attaching them to a pageserver.
Operations that change a tenant's scheduling will spawn a reconciler if
necessary, and there is also a background loop which checks every shard
for the need to reconcile -- this background loop ensures eventual progress
if some earlier reconciliations failed for some reason.
The reconciler has a general purpose code path which will attach/detach from pageservers as necessary, and a special case path for live migrations. The live migration case is more common in practice, and is taken whenever the current observed state indicates that we have a healthy attached location to migrate from. This implements live migration as described in the earlier [live migration RFC](028-pageserver-migration.md).
### Scheduling optimisation
During the periodic background reconciliation loop, the controller also
performance _scheduling optimization_. This is the process of looking for
shards that are in sub-optimal locations, and moving them.
Typically, this means:
- Shards attached outside their preferred AZ (e.g. after a node failure), to migrate them back to their preferred AZ
- Shards attached on the same pageserver as some other shards in the same
tenant, to migrate them elsewhere (e.g. after a shard split)
Scheduling optimisation is a multi-step process to ensure graceful cutovers, e.g. by creating new secondary location, waiting for it to
warm up, then cutting over. This is not done as an explicit queue
of operations, but rather by iteratively calling the optimisation
function, which will recognise each intervening state as something
that can generate the next optimisation.
### Pageserver heartbeats and failure
The `Heartbeater` type is responsible for detecting when a pageserver
becomes unavailable. This is fed back into `Service` for action: when
a pageserver is marked unavailable, tenant shards on that pageserver are
rescheduled and Reconcilers are spawned to cut them over to their new location.
## Pageserver timeline CRUD operations
By CRUD operations, we mean creating and deleting timelines. The authoritative storage for which timelines exist on the pageserver
is in S3, and is governed by the pageserver's system of generation
numbers. Because a shard can be attached to multiple pageservers
concurrently, we need to handle this when doing timeline CRUD operations:
- A timeline operation is only persistent if _after_ the ack from a pageserver, that pageserver's generation is still the latest.
- For deletions in particular, they are only persistent if _all_ attached
locations have acked the deletion operation, since if only the latest one
has acked then the timeline could still return from the dead if some old-generation attachment writes an index for it.
## Zero-downtime controller deployments
When two storage controllers run at the same time, they coordinate via
the database to establish one leader, and the other controller may proxy
requests to this leader
See [Storage controller restarts RFC](037-storage-controller-restarts.md).
Note that this is not a strong consensus mechanism: the controller must also survive split-brain situations. This is respected by code that
e.g. increments version numbers, which uses database transactions that
check the expected value before modifying it. A split-brain situation can
impact availability (e.g. if two controllers are fighting over where to
attach a shard), but it should never impact durability and data integrity.
## Graceful drain & fill of pageservers during deploys
The storage controller has functionality for draining + filling pageservers
while deploying new pageserver binaries, so that clients are not actively
using a pageserver while it restarts.
See [Graceful restarts RFC](033-storage-controller-drain-and-fill.md)
## Safekeeper timeline scheduling
This is currently under development, see [Safekeeper dynamic membership change RFC](035-safekeeper-dynamic-membership-change.md).

View File

@@ -275,6 +275,18 @@ pub enum ComputeMode {
Replica,
}
impl ComputeMode {
/// Convert the compute mode to a string that can be used to identify the type of compute,
/// which means that if it's a static compute, the LSN will not be included.
pub fn to_type_str(&self) -> &'static str {
match self {
ComputeMode::Primary => "primary",
ComputeMode::Static(_) => "static",
ComputeMode::Replica => "replica",
}
}
}
/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
camino.workspace = true
fail.workspace = true
@@ -18,14 +19,15 @@ pprof.workspace = true
regex.workspace = true
routerify.workspace = true
rustls-pemfile.workspace = true
serde.workspace = true
rustls.workspace = true
serde_json.workspace = true
serde_path_to_error.workspace = true
serde.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true
uuid.workspace = true

View File

@@ -1,21 +1,124 @@
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
server::{ClientHello, ResolvesServerCert},
sign::CertifiedKey,
};
pub fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let file = std::fs::File::open(filename)?;
let mut reader = std::io::BufReader::new(file);
pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let cert_data = tokio::fs::read(filename)
.await
.context(format!("failed reading certificate file {filename:?}"))?;
let mut reader = std::io::Cursor::new(&cert_data);
Ok(rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?)
let cert_chain = rustls_pemfile::certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.context(format!("failed parsing certificate from file {filename:?}"))?;
Ok(cert_chain)
}
pub fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
let file = std::fs::File::open(filename)?;
let mut reader = std::io::BufReader::new(file);
pub async fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
let key_data = tokio::fs::read(filename)
.await
.context(format!("failed reading private key file {filename:?}"))?;
let mut reader = std::io::Cursor::new(&key_data);
let key = rustls_pemfile::private_key(&mut reader)?;
let key = rustls_pemfile::private_key(&mut reader)
.context(format!("failed parsing private key from file {filename:?}"))?;
key.ok_or(anyhow::anyhow!(
"no private key found in {}",
filename.as_str(),
))
}
pub async fn load_certified_key(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
) -> anyhow::Result<CertifiedKey> {
let cert_chain = load_cert_chain(cert_filename).await?;
let key = load_private_key(key_filename).await?;
let key = rustls::crypto::ring::default_provider()
.key_provider
.load_private_key(key)?;
let certified_key = CertifiedKey::new(cert_chain, key);
certified_key.keys_match()?;
Ok(certified_key)
}
/// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from
/// the disk periodically.
#[derive(Debug)]
pub struct ReloadingCertificateResolver {
certified_key: ArcSwap<CertifiedKey>,
}
impl ReloadingCertificateResolver {
/// Creates a new Resolver by loading certificate and private key from FS and
/// creating tokio::task to reload them with provided reload_period.
pub async fn new(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
reload_period: Duration,
) -> anyhow::Result<Arc<Self>> {
let this = Arc::new(Self {
certified_key: ArcSwap::from_pointee(
load_certified_key(key_filename, cert_filename).await?,
),
});
tokio::spawn({
let weak_this = Arc::downgrade(&this);
let key_filename = key_filename.to_owned();
let cert_filename = cert_filename.to_owned();
async move {
let start = tokio::time::Instant::now() + reload_period;
let mut interval = tokio::time::interval_at(start, reload_period);
let mut last_reload_failed = false;
loop {
interval.tick().await;
let this = match weak_this.upgrade() {
Some(this) => this,
None => break, // Resolver has been destroyed, exit.
};
match load_certified_key(&key_filename, &cert_filename).await {
Ok(new_certified_key) => {
if new_certified_key.cert == this.certified_key.load().cert {
tracing::debug!("Certificate has not changed since last reloading");
} else {
tracing::info!("Certificate has been reloaded");
this.certified_key.store(Arc::new(new_certified_key));
}
last_reload_failed = false;
}
Err(err) => {
// Note: Reloading certs may fail if it conflicts with the script updating
// the files at the same time. Warn only if the error is persistent.
if last_reload_failed {
tracing::warn!("Error reloading certificate: {err:#}");
} else {
tracing::info!("Error reloading certificate: {err:#}");
}
last_reload_failed = true;
}
}
}
}
});
Ok(this)
}
}
impl ResolvesServerCert for ReloadingCertificateResolver {
fn resolve(&self, _client_hello: ClientHello<'_>) -> Option<Arc<CertifiedKey>> {
Some(self.certified_key.load_full())
}
}

View File

@@ -61,6 +61,9 @@ pub struct ConfigToml {
pub listen_https_addr: Option<String>,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")]
pub ssl_cert_reload_period: Duration,
pub ssl_ca_file: Option<Utf8PathBuf>,
pub availability_zone: Option<String>,
#[serde(with = "humantime_serde")]
pub wait_lsn_timeout: Duration,
@@ -282,12 +285,6 @@ pub struct TenantConfigToml {
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
/// to avoid deadlock. 0 to disable. Disabled by default.
pub l0_flush_stall_threshold: Option<usize>,
/// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next
/// layer. This is a temporary backpressure mechanism which should be removed once
/// l0_flush_{delay,stall}_threshold is fully enabled.
///
/// TODO: this is no longer enabled, remove it when the config option is no longer set.
pub l0_flush_wait_upload: bool,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
@@ -439,6 +436,8 @@ impl Default for ConfigToml {
listen_https_addr: (None),
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_file: None,
availability_zone: (None),
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
@@ -574,8 +573,6 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = false;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -622,7 +619,6 @@ impl Default for TenantConfigToml {
compaction_l0_semaphore: DEFAULT_COMPACTION_L0_SEMAPHORE,
l0_flush_delay_threshold: None,
l0_flush_stall_threshold: None,
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),

View File

@@ -523,8 +523,6 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_stall_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_wait_upload: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_horizon: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_period: FieldPatch<String>,
@@ -614,9 +612,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_stall_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_wait_upload: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_horizon: Option<u64>,
@@ -712,7 +707,6 @@ impl TenantConfig {
mut compaction_l0_semaphore,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
@@ -765,7 +759,6 @@ impl TenantConfig {
patch
.l0_flush_stall_threshold
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch
.gc_period
@@ -844,7 +837,6 @@ impl TenantConfig {
compaction_l0_semaphore,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
gc_horizon,
gc_period,
image_creation_threshold,
@@ -911,9 +903,6 @@ impl TenantConfig {
l0_flush_stall_threshold: self
.l0_flush_stall_threshold
.or(global_conf.l0_flush_stall_threshold),
l0_flush_wait_upload: self
.l0_flush_wait_upload
.unwrap_or(global_conf.l0_flush_wait_upload),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
image_creation_threshold: self
@@ -1364,6 +1353,12 @@ pub enum TimelineArchivalState {
Unarchived,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum TimelineVisibilityState {
Visible,
Invisible,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct TimelineArchivalConfigRequest {
pub state: TimelineArchivalState,
@@ -1496,6 +1491,9 @@ pub struct TimelineInfo {
/// The status of the rel_size migration.
pub rel_size_migration: Option<RelSizeMigration>,
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -8,10 +8,9 @@ license = "MIT/Apache-2.0"
bytes.workspace = true
fallible-iterator.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log = "0.4"
tracing.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
phf = "0.11"
postgres-protocol2 = { path = "../postgres-protocol2" }
postgres-types2 = { path = "../postgres-types2" }
tokio = { workspace = true, features = ["io-util", "time", "net"] }

View File

@@ -6,13 +6,13 @@ use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use log::{info, trace};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tokio_util::sync::PollSender;
use tracing::{info, trace};
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,9 @@ use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;

View File

@@ -7,11 +7,11 @@ use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::{Level, debug, log_enabled};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
@@ -36,7 +36,7 @@ where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if log_enabled!(Level::Debug) {
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",

View File

@@ -6,10 +6,10 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;

View File

@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
pub use reqwest::Body as ReqwestBody;
use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
use reqwest::{IntoUrl, Method, StatusCode, Url};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -39,8 +39,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -72,24 +72,7 @@ pub enum ForceAwaitLogicalSize {
}
impl Client {
pub fn new(
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build().map_err(Error::CreateClient)?;
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),

View File

@@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -75,10 +75,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -123,10 +123,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -12,6 +12,7 @@ use std::time::Duration;
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
@@ -234,6 +235,7 @@ fn initialize_config(
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
@@ -427,7 +429,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
conf,
);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
@@ -621,12 +623,15 @@ fn start_pageserver(
let https_task = match https_listener {
Some(https_listener) => {
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
let resolver = MGMT_REQUEST_RUNTIME.block_on(ReloadingCertificateResolver::new(
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
))?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)?;
.with_cert_resolver(resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));

View File

@@ -17,7 +17,7 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use remote_storage::{RemotePath, RemoteStorageConfig};
use reqwest::Url;
use reqwest::{Certificate, Url};
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
@@ -43,7 +43,7 @@ use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, virtual_file};
///
/// For fields that require additional validation or filling in of defaults at runtime,
/// check for examples in the [`PageServerConf::parse_and_validate`] method.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
@@ -56,8 +56,17 @@ pub struct PageServerConf {
/// Example: 127.0.0.1:9899
pub listen_https_addr: Option<String>,
/// Path to a file with certificate's private key for https API.
/// Default: server.key
pub ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
/// Default: server.crt
pub ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
/// Default: 60s.
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
pub ssl_ca_certs: Vec<Certificate>,
/// Current availability zone. Used for traffic metrics.
pub availability_zone: Option<String>,
@@ -325,6 +334,8 @@ impl PageServerConf {
listen_https_addr,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
ssl_ca_file,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
@@ -386,6 +397,7 @@ impl PageServerConf {
listen_https_addr,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
@@ -469,6 +481,13 @@ impl PageServerConf {
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
ssl_ca_certs: match ssl_ca_file {
Some(ssl_ca_file) => {
let buf = std::fs::read(ssl_ca_file)?;
Certificate::from_pem_bundle(&buf)?
}
None => Vec::new(),
},
};
// ------------------------------------------------------------

View File

@@ -50,10 +50,13 @@ pub trait StorageControllerUpcallApi {
impl StorageControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
pub fn new(
conf: &'static PageServerConf,
cancel: &CancellationToken,
) -> Result<Option<Self>, reqwest::Error> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return None,
None => return Ok(None),
};
if let Ok(mut segs) = url.path_segments_mut() {
@@ -73,12 +76,16 @@ impl StorageControllerUpcallClient {
client = client.default_headers(headers);
}
Some(Self {
http_client: client.build().expect("Failed to construct HTTP client"),
for ssl_ca_cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(ssl_ca_cert.clone());
}
Ok(Some(Self {
http_client: client.build()?,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
})
}))
}
#[tracing::instrument(skip_all)]

View File

@@ -669,6 +669,13 @@ paths:
Detach a timeline from its ancestor and reparent all ancestors timelines with lower `ancestor_lsn`.
Current implementation might not be retryable across failure cases, but will be enhanced in future.
Detaching should be expected to be expensive operation. Timeouts should be retried.
parameters:
- name: detach_behavior
in: query
required: false
schema:
description: Currently valid values are `v1`, `v2`
type: string
responses:
"200":
description: |
@@ -1079,6 +1086,7 @@ components:
- last_record_lsn
- disk_consistent_lsn
- state
- min_readable_lsn
properties:
timeline_id:
type: string
@@ -1125,6 +1133,40 @@ components:
applied_gc_cutoff_lsn:
type: string
format: hex
safekeepers:
$ref: "#/components/schemas/TimelineSafekeepersInfo"
TimelineSafekeepersInfo:
type: object
required:
- tenant_id
- timeline_id
- generation
- safekeepers
properties:
tenant_id:
type: string
format: hex
timeline_id:
type: string
format: hex
generation:
type: integer
safekeepers:
type: array
items:
$ref: "#/components/schemas/TimelineSafekeeperInfo"
TimelineSafekeeperInfo:
type: object
required:
- id
- hostname
properties:
id:
type: integer
hostname:
type: string
SyntheticSizeResponse:
type: object

View File

@@ -37,8 +37,8 @@ use pageserver_api::models::{
TenantShardSplitResponse, TenantSorting, TenantState, TenantWaitLsnRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateRequestMode,
TimelineCreateRequestModeImportPgdata, TimelineGcRequest, TimelineInfo,
TimelinePatchIndexPartRequest, TimelinesInfoAndOffloaded, TopTenantShardItem,
TopTenantShardsRequest, TopTenantShardsResponse,
TimelinePatchIndexPartRequest, TimelineVisibilityState, TimelinesInfoAndOffloaded,
TopTenantShardItem, TopTenantShardsRequest, TopTenantShardsResponse,
};
use pageserver_api::shard::{ShardCount, TenantShardId};
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
@@ -439,6 +439,7 @@ async fn build_timeline_info_common(
let remote_consistent_lsn_visible = timeline
.get_remote_consistent_lsn_visible()
.unwrap_or(Lsn(0));
let is_invisible = timeline.remote_client.is_invisible().unwrap_or(false);
let walreceiver_status = timeline.walreceiver_status();
@@ -482,6 +483,7 @@ async fn build_timeline_info_common(
state,
is_archived: Some(is_archived),
rel_size_migration: Some(timeline.get_rel_size_v2_status()),
is_invisible: Some(is_invisible),
walreceiver_status,
};
@@ -2254,7 +2256,6 @@ async fn timeline_compact_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
@@ -2333,6 +2334,28 @@ async fn timeline_compact_handler(
.await
}
async fn timeline_mark_invisible_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let timeline = tenant.get_timeline(timeline_id, true)?;
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(TimelineVisibilityState::Invisible).map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_timeline_mark_invisible", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run offload immediately on given timeline.
async fn timeline_offload_handler(
request: Request<Body>,
@@ -2393,7 +2416,6 @@ async fn timeline_checkpoint_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
@@ -3750,6 +3772,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|r| api_handler( r, timeline_mark_invisible_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
|r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),

View File

@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
use futures::Future;
use metrics::{
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
@@ -499,15 +499,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2864,7 +2855,6 @@ pub(crate) struct TimelineMetrics {
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_delay_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
@@ -2916,9 +2906,6 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
@@ -3046,7 +3033,6 @@ impl TimelineMetrics {
timeline_id,
flush_time_histo,
flush_delay_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
@@ -3096,14 +3082,6 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}
/// Generates TIMELINE_LAYER labels for a persistent layer.
fn make_layer_labels(&self, layer_desc: &PersistentLayerDesc) -> [&str; 5] {
let level = match LayerMap::is_l0(&layer_desc.key_range, layer_desc.is_delta()) {
@@ -3207,7 +3185,6 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());

View File

@@ -237,7 +237,7 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr, application_name))]
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
@@ -2512,6 +2512,58 @@ impl PageServiceCmd {
}
}
/// Parse the startup options from the postgres wire protocol startup packet.
///
/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
/// whether all options are successfully parsed. There could be duplicates in the options
/// if the caller passed such parameters.
fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
let mut parsing_config = false;
let mut has_error = false;
let mut config = Vec::new();
for item in options.split_whitespace() {
if item == "-c" {
if !parsing_config {
parsing_config = true;
} else {
// "-c" followed with another "-c"
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
} else if item.starts_with("-c") || parsing_config {
let Some((mut key, value)) = item.split_once('=') else {
// "-c" followed with an invalid option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
if !parsing_config {
// Parse "-coptions=X"
let Some(stripped_key) = key.strip_prefix("-c") else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
key = stripped_key;
}
config.push((key.to_string(), value.to_string()));
parsing_config = false;
} else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
}
if parsing_config {
// "-c" without the option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
}
(config, has_error)
}
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
@@ -2556,6 +2608,14 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
Span::current().record("compute_mode", field::display(value));
}
}
}
};
Ok(())
@@ -2669,6 +2729,7 @@ where
PageServiceCmd::Set => {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
// TODO: allow setting options, i.e., application_name/compute_mode via SET commands
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
@@ -2943,4 +3004,46 @@ mod tests {
let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
assert!(cmd.is_err());
}
#[test]
fn test_parse_options() {
let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
assert!(!has_error);
assert_eq!(
config,
vec![("neon.compute_mode".to_string(), "primary".to_string())]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (_, has_error) = parse_options("-c");
assert!(has_error);
let (_, has_error) = parse_options("-c foo=bar -c -c");
assert!(has_error);
let (_, has_error) = parse_options(" ");
assert!(!has_error);
let (_, has_error) = parse_options(" -c neon.compute_mode");
assert!(has_error);
}
}

View File

@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
info!("task {} completed", task.name);
}
} else {

View File

@@ -3080,6 +3080,7 @@ impl Tenant {
let mut has_pending_l0 = false;
for timeline in compact_l0 {
let ctx = &ctx.with_scope_timeline(&timeline);
// NB: don't set CompactFlags::YieldForL0, since this is an L0-only compaction pass.
let outcome = timeline
.compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
@@ -3097,14 +3098,9 @@ impl Tenant {
}
}
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated
// more L0 layers, they may also be compacted here.
//
// NB: image compaction may yield if there is pending L0 compaction.
//
// TODO: it will only yield if there is pending L0 compaction on the same timeline. If a
// different timeline needs compaction, it won't. It should check `l0_compaction_trigger`.
// We leave this for a later PR.
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated more
// L0 layers, they may also be compacted here. Image compaction will yield if there is
// pending L0 compaction on any tenant timeline.
//
// TODO: consider ordering timelines by some priority, e.g. time since last full compaction,
// amount of L1 delta debt or garbage, offload-eligible timelines first, etc.
@@ -3115,8 +3111,14 @@ impl Tenant {
}
let ctx = &ctx.with_scope_timeline(&timeline);
// Yield for L0 if the separate L0 pass is enabled (otherwise there's no point).
let mut flags = EnumSet::default();
if self.get_compaction_l0_first() {
flags |= CompactFlags::YieldForL0;
}
let mut outcome = timeline
.compact(cancel, EnumSet::default(), ctx)
.compact(cancel, flags, ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
.await
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
@@ -6516,11 +6518,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6537,11 +6535,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6558,11 +6552,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6579,11 +6569,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
assert_eq!(
@@ -6666,9 +6652,7 @@ mod tests {
timeline.freeze_and_flush().await?;
if compact {
// this requires timeline to be &Arc<Timeline>
timeline
.compact(&cancel, CompactFlags::NoYield.into(), ctx)
.await?;
timeline.compact(&cancel, EnumSet::default(), ctx).await?;
}
// this doesn't really need to use the timeline_id target, but it is closer to what it
@@ -6995,7 +6979,6 @@ mod tests {
child_timeline.freeze_and_flush().await?;
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
child_timeline
.compact(&CancellationToken::new(), flags, &ctx)
.await?;
@@ -7374,9 +7357,7 @@ mod tests {
// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
@@ -7705,7 +7686,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
} else {
EnumSet::empty()
@@ -7756,9 +7736,7 @@ mod tests {
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
@@ -7923,7 +7901,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8386,7 +8363,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8454,7 +8430,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -11551,4 +11526,255 @@ mod tests {
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
use pageserver_api::models::TimelineVisibilityState;
use crate::tenant::size::gather_inputs;
let tenant_conf = pageserver_api::models::TenantConfig {
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
pitr_interval: Some(Duration::ZERO),
..Default::default()
};
let harness = TenantHarness::create_custom(
"test_synthetic_size_calculation_with_invisible_branches",
tenant_conf,
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)
.await?;
let (tenant, ctx) = harness.load().await;
let main_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![],
vec![],
vec![],
Lsn(0x100),
)
.await?;
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot1,
Some(Lsn(0x20)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot2,
Some(Lsn(0x30)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot3,
Some(Lsn(0x40)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let limit = Arc::new(Semaphore::new(1));
let max_retention_period = None;
let mut logical_size_cache = HashMap::new();
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
let cancel = CancellationToken::new();
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
use LsnKind::*;
use tenant_size_model::Segment;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: GcCutOff,
}, // we need to retain everything above the last branch point
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
main_tline
.remote_client
.schedule_index_upload_for_timeline_invisible_state(
TimelineVisibilityState::Invisible,
)?;
main_tline.remote_client.wait_completion().await?;
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40, // Branch end LSN == last branch point LSN
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
Ok(())
}
}

View File

@@ -344,7 +344,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel) {
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {

View File

@@ -194,7 +194,7 @@ pub(crate) use download::{
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState};
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
use remote_storage::{
@@ -573,6 +573,16 @@ impl RemoteTimelineClient {
.ok()
}
/// Returns true if the timeline is invisible in synthetic size calculations.
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.upload_queue
.lock()
.unwrap()
.initialized_mut()
.map(|q| q.clean.0.marked_invisible_at.is_some())
.ok()
}
/// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
///
/// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
@@ -845,6 +855,37 @@ impl RemoteTimelineClient {
Ok(need_wait)
}
pub(crate) fn schedule_index_upload_for_timeline_invisible_state(
self: &Arc<Self>,
state: TimelineVisibilityState,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
fn need_change(
marked_invisible_at: &Option<NaiveDateTime>,
state: TimelineVisibilityState,
) -> Option<bool> {
match (marked_invisible_at, state) {
(Some(_), TimelineVisibilityState::Invisible) => Some(false),
(None, TimelineVisibilityState::Invisible) => Some(true),
(Some(_), TimelineVisibilityState::Visible) => Some(false),
(None, TimelineVisibilityState::Visible) => Some(true),
}
}
let need_upload_scheduled = need_change(&upload_queue.dirty.marked_invisible_at, state);
if let Some(marked_invisible_at_set) = need_upload_scheduled {
let intended_marked_invisible_at =
marked_invisible_at_set.then(|| Utc::now().naive_utc());
upload_queue.dirty.marked_invisible_at = intended_marked_invisible_at;
self.schedule_index_upload(upload_queue);
}
Ok(())
}
/// Shuts the timeline client down, but only if the timeline is archived.
///
/// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
@@ -1927,9 +1968,7 @@ impl RemoteTimelineClient {
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
/// The number of inprogress tasks is limited by `Self::inprogress_tasks`, see `next_ready`.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");
@@ -2177,6 +2216,11 @@ impl RemoteTimelineClient {
}
res
}
// TODO: this should wait for the deletion to be executed by the deletion queue.
// Otherwise, the deletion may race with an upload and wrongfully delete a newer
// file. Some of the above logic attempts to work around this, it should be replaced
// by the upload queue ordering guarantees (see `can_bypass`). See:
// <https://github.com/neondatabase/neon/issues/10283>.
UploadOp::Delete(delete) => {
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();

View File

@@ -110,6 +110,10 @@ pub struct IndexPart {
/// just the specific use case here; it needs a new name.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) gc_compaction: Option<GcCompactionState>,
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -137,10 +141,11 @@ impl IndexPart {
/// - 11: +rel_size_migration
/// - 12: +l2_lsn
/// - 13: +gc_compaction
const LATEST_VERSION: usize = 13;
/// - 14: +marked_invisible_at
const LATEST_VERSION: usize = 14;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -159,6 +164,7 @@ impl IndexPart {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
}
}
@@ -468,6 +474,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -516,6 +523,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -565,6 +573,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -617,6 +626,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -664,6 +674,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -714,6 +725,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -769,6 +781,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -829,6 +842,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -890,6 +904,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -956,6 +971,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1035,6 +1051,7 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1115,6 +1132,7 @@ mod tests {
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1124,7 +1142,7 @@ mod tests {
#[test]
fn v12_v13_l2_gc_ompaction_is_parsed() {
let example = r#"{
"version": 12,
"version": 13,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
@@ -1160,7 +1178,7 @@ mod tests {
}"#;
let expected = IndexPart {
version: 12,
version: 13,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
@@ -1201,6 +1219,95 @@ mod tests {
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v14_marked_invisible_at_is_parsed() {
let example = r#"{
"version": 14,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123"
}"#;
let expected = IndexPart {
version: 14,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -33,7 +33,7 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct SegmentMeta {
pub segment: Segment,
pub timeline_id: TimelineId,
@@ -248,6 +248,8 @@ pub(super) async fn gather_inputs(
None
};
let branch_is_invisible = timeline.is_invisible() == Some(true);
let lease_points = gc_info
.leases
.keys()
@@ -271,7 +273,10 @@ pub(super) async fn gather_inputs(
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
if !branch_is_invisible {
// Do not count lease points for invisible branches.
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
}
drop(gc_info);
@@ -287,7 +292,9 @@ pub(super) async fn gather_inputs(
// Add a point for the PITR cutoff
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
if !branch_start_needed {
if !branch_start_needed && !branch_is_invisible {
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
// range from the last branch point to the latest data.
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
}
@@ -373,11 +380,19 @@ pub(super) async fn gather_inputs(
}
}
let branch_end_lsn = if branch_is_invisible {
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
segments.last().unwrap().segment.lsn
} else {
// Otherwise, the branch end is the last record LSN.
last_record_lsn.0
};
// Current end of the timeline
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: last_record_lsn.0,
lsn: branch_end_lsn,
size: None, // Filled in later, if necessary
needed: true,
},
@@ -609,6 +624,7 @@ async fn calculate_logical_size(
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[cfg(test)]
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
@@ -766,6 +782,7 @@ fn verify_size_for_multiple_branches() {
assert_eq!(inputs.calculate(), 37_851_408);
}
#[cfg(test)]
#[test]
fn verify_size_for_one_branch() {
let doc = r#"

View File

@@ -84,8 +84,8 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::remote_timeline_client::index::{GcCompactionState, IndexPart};
use super::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use super::secondary::heatmap::HeatMapLayer;
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
@@ -870,9 +870,14 @@ pub(crate) enum CompactFlags {
OnlyL0Compaction,
EnhancedGcBottomMostCompaction,
DryRun,
/// Disables compaction yielding e.g. due to high L0 count. This is set e.g. when requesting
/// compaction via HTTP API.
NoYield,
/// Makes image compaction yield if there's pending L0 compaction. This should always be used in
/// the background compaction task, since we want to aggressively compact down L0 to bound
/// read amplification.
///
/// It only makes sense to use this when `compaction_l0_first` is enabled (such that we yield to
/// an L0 compaction pass), and without `OnlyL0Compaction` (L0 compaction shouldn't yield for L0
/// compaction).
YieldForL0,
}
#[serde_with::serde_as]
@@ -1891,18 +1896,19 @@ impl Timeline {
// out by other background tasks (including image compaction). We request this via
// `BackgroundLoopKind::L0Compaction`.
//
// If this is a regular compaction pass, and L0-only compaction is enabled in the config,
// then we should yield for immediate L0 compaction if necessary while we're waiting for the
// background task semaphore. There's no point yielding otherwise, since we'd just end up
// right back here.
// Yield for pending L0 compaction while waiting for the semaphore.
let is_l0_only = options.flags.contains(CompactFlags::OnlyL0Compaction);
let semaphore_kind = match is_l0_only && self.get_compaction_l0_semaphore() {
true => BackgroundLoopKind::L0Compaction,
false => BackgroundLoopKind::Compaction,
};
let yield_for_l0 = !is_l0_only
&& self.get_compaction_l0_first()
&& !options.flags.contains(CompactFlags::NoYield);
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
if yield_for_l0 {
// If this is an L0 pass, it doesn't make sense to yield for L0.
debug_assert!(!is_l0_only, "YieldForL0 during L0 pass");
// If `compaction_l0_first` is disabled, there's no point yielding.
debug_assert!(self.get_compaction_l0_first(), "YieldForL0 without L0 pass");
}
let acquire = async move {
let guard = self.compaction_lock.lock().await;
@@ -2209,6 +2215,10 @@ impl Timeline {
self.remote_client.is_archived()
}
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.remote_client.is_invisible()
}
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
@@ -2562,14 +2572,6 @@ impl Timeline {
Some(max(l0_flush_stall_threshold, compaction_threshold))
}
fn get_l0_flush_wait_upload(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.l0_flush_wait_upload
.unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -4591,27 +4593,6 @@ impl Timeline {
// release lock on 'layers'
};
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
// TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead.
if self.get_l0_flush_wait_upload() {
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);
}
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this

View File

@@ -394,8 +394,8 @@ impl GcCompactionQueue {
if job.dry_run {
flags |= CompactFlags::DryRun;
}
if options.flags.contains(CompactFlags::NoYield) {
flags |= CompactFlags::NoYield;
if options.flags.contains(CompactFlags::YieldForL0) {
flags |= CompactFlags::YieldForL0;
}
let options = CompactOptions {
flags,
@@ -983,7 +983,7 @@ impl Timeline {
// Yield if we have pending L0 compaction. The scheduler will do another pass.
if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
&& !options.flags.contains(CompactFlags::NoYield)
&& options.flags.contains(CompactFlags::YieldForL0)
{
info!("image/ancestor compaction yielding for L0 compaction");
return Ok(CompactionOutcome::YieldForL0);
@@ -1028,7 +1028,7 @@ impl Timeline {
.load()
.as_ref()
.clone(),
!options.flags.contains(CompactFlags::NoYield),
options.flags.contains(CompactFlags::YieldForL0),
)
.await
.inspect_err(|err| {
@@ -2635,7 +2635,7 @@ impl Timeline {
) -> Result<CompactionOutcome, CompactionError> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options.clone());
let no_yield = options.flags.contains(CompactFlags::NoYield);
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
if sub_compaction {
info!(
"running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
@@ -2650,7 +2650,7 @@ impl Timeline {
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx, no_yield)
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.await?;
}
if jobs_len == 0 {
@@ -2658,7 +2658,8 @@ impl Timeline {
}
return Ok(CompactionOutcome::Done);
}
self.compact_with_gc_inner(cancel, job, ctx, no_yield).await
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.await
}
async fn compact_with_gc_inner(
@@ -2666,7 +2667,7 @@ impl Timeline {
cancel: &CancellationToken,
job: GcCompactJob,
ctx: &RequestContext,
no_yield: bool,
yield_for_l0: bool,
) -> Result<CompactionOutcome, CompactionError> {
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
@@ -2936,18 +2937,15 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
if !no_yield {
let should_yield = self
let should_yield = yield_for_l0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction when downloading layers: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
if should_yield {
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
let resident_layer = layer
.download_and_keep_resident(ctx)
@@ -3081,21 +3079,17 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
if !no_yield {
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction in the main loop: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
if self.shard_identity.is_key_disposable(&key) {
// If this shard does not need to store this key, simply skip it.

View File

@@ -235,7 +235,7 @@ pub(super) async fn prepare(
return Err(NoAncestor);
}
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
// If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
@@ -249,7 +249,13 @@ pub(super) async fn prepare(
ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
ancestor = ancestor_of_ancestor;
// TODO: do we still need to check if we don't want to reparent?
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
check_no_archived_children_of_ancestor(
tenant,
detached,
&ancestor,
ancestor_lsn,
behavior,
)?;
}
} else if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
@@ -1156,31 +1162,44 @@ fn check_no_archived_children_of_ancestor(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,
detach_behavior: DetachBehavior,
) -> Result<(), Error> {
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
match detach_behavior {
DetachBehavior::NoAncestorAndReparent => {
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in
reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
{
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
DetachBehavior::MultiLevelAndNoReparent => {
// We don't need to check anything if the user requested to not reparent.
}
}
Ok(())
}

View File

@@ -647,18 +647,25 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return found;
}
#if PG_MAJORVERSION_NUM >= 16
static PGIOAlignedBlock voidblock = {0};
#else
static PGAlignedBlock voidblock = {0};
#endif
#define SCRIBBLEPAGE (&voidblock.data)
/*
* Try to read pages from local cache.
* Returns the number of pages read from the local cache, and sets bits in
* 'read' for the pages which were read. This may scribble over buffers not
* marked in 'read', so be careful with operation ordering.
* 'mask' for the pages which were read. This may scribble over buffers not
* marked in 'mask', so be careful with operation ordering.
*
* In case of error local file cache is disabled (lfc->limit is set to zero),
* and -1 is returned. Note that 'read' and the buffers may be touched and in
* an otherwise invalid state.
* and -1 is returned.
*
* If the mask argument is supplied, bits will be set at the offsets of pages
* that were present and read from the LFC.
* If the mask argument is supplied, we'll only try to read those pages which
* don't have their bits set on entry. At exit, pages which were successfully
* read from LFC will have their bits set.
*/
int
lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -693,23 +700,43 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
int n_blocks_to_read = 0;
int n_blocks_to_read = 0;
int iov_last_used = 0;
int first_block_in_chunk_read = -1;
ConditionVariable* cv;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
{
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
/* mask not set = we must do work */
if (!BITMAP_ISSET(mask, buf_offset + i))
{
iov[i].iov_base = buffers[buf_offset + i];
n_blocks_to_read++;
iov_last_used = i + 1;
if (first_block_in_chunk_read == -1)
{
first_block_in_chunk_read = i;
}
}
/* mask set = we must do no work */
else
{
/* don't scribble on pages we weren't requested to write to */
iov[i].iov_base = SCRIBBLEPAGE;
}
}
/* shortcut IO */
if (n_blocks_to_read == 0)
{
buf_offset += blocks_in_chunk;
@@ -718,6 +745,12 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
continue;
}
/*
* The effective iov size must be >= the number of blocks we're about
* to read.
*/
Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -762,10 +795,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
generation = lfc_ctl->generation;
entry_offset = entry->offset;
for (int i = 0; i < blocks_in_chunk; i++)
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
/* no need to work on something we're not interested in */
if (BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,7 +827,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
BITMAP_SET(chunk_mask, i);
iteration_hits++;
}
else
@@ -801,16 +839,34 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
/* Read only the blocks we're interested in, limiting */
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
nwrite, first_read_offset * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (rc != (BLCKSZ * nwrite))
{
lfc_disable("read");
return -1;
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
/* Place entry to the head of LRU list */
@@ -1013,6 +1069,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1163,6 +1222,13 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
/*

View File

@@ -15,6 +15,7 @@
#include "postgres.h"
#include <math.h>
#include <sys/socket.h>
#include "libpq-int.h"
@@ -50,6 +51,20 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
CP_MODE_STATIC
};
static const struct config_enum_entry neon_compute_modes[] = {
{"primary", CP_MODE_PRIMARY, false},
{"replica", CP_MODE_REPLICA, false},
{"static", CP_MODE_STATIC, false},
{NULL, 0, false}
};
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -62,11 +77,13 @@ int flush_every_n_requests = 8;
int neon_protocol_version = 2;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int pageserver_response_log_timeout = 10000;
static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
static int pageserver_response_disconnect_timeout = 150000;
typedef struct
{
@@ -390,9 +407,10 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[4];
const char *values[4];
char pid_str[16];
const char *keywords[5];
const char *values[5];
char pid_str[16] = { 0 };
char endpoint_str[36] = { 0 };
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -464,6 +482,31 @@ pageserver_connect(shardno_t shard_no, int elevel)
n_pgsql_params++;
}
{
bool param_set = false;
switch (neon_compute_mode)
{
case CP_MODE_PRIMARY:
strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_REPLICA:
strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_STATIC:
strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str));
param_set = true;
break;
}
if (param_set)
{
keywords[n_pgsql_params] = "options";
values[n_pgsql_params] = endpoint_str;
n_pgsql_params++;
}
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
@@ -722,6 +765,24 @@ get_socket_stats(int socketfd, int *sndbuf, int *recvbuf)
#endif
}
/*
* Tries to get the local port of a socket. Sets 'port' to -1 on error.
*/
static void
get_local_port(int socketfd, int *port)
{
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
memset(&addr, 0, addr_len);
if (getsockname(socketfd, (struct sockaddr*) &addr, &addr_len) == 0)
{
*port = ntohs(addr.sin_port);
} else {
*port = -1;
}
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
@@ -811,15 +872,17 @@ retry:
*/
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= pageserver_response_log_timeout)
{
int port;
int sndbuf;
int recvbuf;
get_local_port(PQsocket(pageserver_conn), &port);
get_socket_stats(PQsocket(pageserver_conn), &sndbuf, &recvbuf);
neon_shard_log(shard_no, LOG,
"no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d) (conn start=%d end=%d)",
"no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket port=%d sndbuf=%d recvbuf=%d) (conn start=%d end=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf,
shard->nrequests_sent, shard->nresponses_received, port, sndbuf, recvbuf,
pageserver_conn->inStart, pageserver_conn->inEnd);
shard->receive_last_log_time = now;
shard->receive_logged = true;
@@ -841,8 +904,10 @@ retry:
*/
if (INSTR_TIME_GET_MILLISEC(since_start) >= pageserver_response_disconnect_timeout)
{
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting",
INSTR_TIME_GET_DOUBLE(since_start));
int port;
get_local_port(PQsocket(pageserver_conn), &port);
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting (socket port=%d)",
INSTR_TIME_GET_DOUBLE(since_start), port);
pageserver_disconnect(shard_no);
return -1;
}
@@ -1077,15 +1142,22 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */);
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
}
if (rc == 0)
return NULL;
@@ -1365,11 +1437,22 @@ pg_init_libpagestore(void)
"If the pageserver doesn't respond to a request within this timeout, "
"disconnect and reconnect.",
&pageserver_response_disconnect_timeout,
120000, 100, INT_MAX,
150000, 100, INT_MAX,
PGC_SUSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"neon.compute_mode",
"The compute endpoint node type",
NULL,
&neon_compute_mode,
CP_MODE_PRIMARY,
neon_compute_modes,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)

View File

@@ -315,7 +315,7 @@ static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 1;
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

View File

@@ -99,7 +99,7 @@ static char *hexdump_page(char *page);
#define IS_LOCAL_REL(reln) (\
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \
)
const int SmgrTrace = DEBUG5;
@@ -1040,6 +1040,16 @@ prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, n
continue;
}
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forknum, blocknum + i, buffers[i]);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
@@ -1071,6 +1081,9 @@ prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_r
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
* to calculate the LSNs to send.
*
* Bits set in *mask (if present) indicate pages already read; i.e. pages we
* can skip in this process.
*
* When performing a prefetch rather than a synchronous request,
* is_prefetch==true. Currently, it only affects how the request is accounted
* in the perf counters.
@@ -1116,7 +1129,7 @@ Retry:
uint64 ring_index;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
if (frlsns)
@@ -2371,7 +2384,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LSN_FORMAT_ARGS(last_written_lsn),
LSN_FORMAT_ARGS(flushlsn));
XLogFlush(last_written_lsn);
flushlsn = last_written_lsn;
}
/*
@@ -2387,18 +2399,35 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
*
* Remember the current LSN, however, so that we can later
* correctly determine if the response to the request is still
* valid. The most up-to-date LSN we could use for that purpose
* would be the current insert LSN, but to avoid the overhead of
* looking it up, use 'flushlsn' instead. This relies on the
* assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we
* wouldn't be requesting it.
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
* but it is not correct. Consider the following scenario:
* 1. Backend A wants to prefetch block X
* 2. Backend A checks that block X is not present in the shared buffer cache
* 3. Backend A calls prefetch_do_request, which calls neon_get_request_lsns
* 4. neon_get_request_lsns obtains LwLSN=11 for the block
* 5. Backend B downloads block X, updates and wallogs it with LSN=13
* 6. Block X is once again evicted from shared buffers, its LwLSN is set to LSN=13
* 7. Backend A is still executing in neon_get_request_lsns(). It calls 'flushlsn = GetFlushRecPtr();'.
* Let's say that it is LSN=14
* 8. Backend A uses LSN=14 as effective_lsn in the prefetch slot. The request stored in the slot is
* [not_modified_since=11, effective_request_lsn=14]
* 9. Backend A sends the prefetch request, pageserver processes it, and sends response.
* The last LSN that the pageserver had processed was LSN=12, so the page image in the response is valid at LSN=12.
* 10. Backend A calls smgrread() for page X with LwLSN=13
* 11. Backend A finds in prefetch ring the response for the prefetch request with [not_modified_since=11, effective_lsn=Lsn14],
* so it satisfies neon_prefetch_response_usable condition.
*
* Things go wrong in step 7-8, when [not_modified_since=11, effective_request_lsn=14] is determined for the request.
* That is incorrect, because the page has in fact been modified at LSN=13. The invariant is that for any request,
* there should not be any modifications to a page between its not_modified_since and (effective_)request_lsn values.
*
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
*/
result->request_lsn = UINT64_MAX;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = flushlsn;
result->effective_request_lsn = last_written_lsn;
}
}
}
@@ -2457,11 +2486,8 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the last flush WAL position when the request
* was sent to the pageserver. That's logically the LSN that we are
* requesting the page at, but we send UINT64_MAX to the pageserver so
* that if the GC horizon advances past that position, we still get a
* valid response instead of an error.
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
*
* To determine whether a response to a GetPage request issued earlier is
* still valid to satisfy a new page read, we look at the
@@ -3016,9 +3042,6 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
@@ -3124,6 +3147,15 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
#endif
}
/*
* Read N pages at a specific LSN.
*
* *mask is set for pages read at a previous point in time, and which we
* should not touch, nor overwrite.
* New bits should be set in *mask for the pages we'successfully read.
*
* The offsets in request_lsns, buffers, and mask are linked.
*/
static void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
@@ -3176,7 +3208,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block
neon_request_lsns *reqlsns = &request_lsns[i];
TimestampTz start_ts, end_ts;
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
start_ts = GetCurrentTimestamp();
@@ -3277,6 +3309,12 @@ Retry:
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
break;
@@ -3469,9 +3507,7 @@ static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
bits8 lfc_hits[PG_IOV_MAX / 8];
bits8 read[PG_IOV_MAX / 8];
bits8 read_pages[PG_IOV_MAX / 8];
neon_request_lsns request_lsns[PG_IOV_MAX];
int lfc_result;
int prefetch_result;
@@ -3503,19 +3539,18 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
memset(read_pages, 0, sizeof(read_pages));
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
if (prefetch_result == nblocks)
return;
/* invert the result: exclude prefetched blocks */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_hits[i] = ~prefetch_hits[i];
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
nblocks, lfc_hits);
nblocks, read_pages);
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
@@ -3524,21 +3559,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (prefetch_result + lfc_result == nblocks)
return;
if (lfc_result <= 0)
{
/* can't use the LFC result, so read all blocks from PS */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~prefetch_hits[i];
}
else
{
/* invert the result: exclude blocks read from lfc */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
}
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read);
buffers, nblocks, read_pages);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.

View File

@@ -99,6 +99,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->config = config;
wp->api = api;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;
wp->mconf.new_members.len = 0;
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
@@ -170,6 +173,8 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
if (wp->safekeepers_generation > 0 && wp->config->proto_version < 3)
wp_log(FATAL, "enabling generations requires protocol version 3");
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
@@ -214,7 +219,7 @@ WalProposerFree(WalProposer *wp)
static bool
WalProposerGenerationsEnabled(WalProposer *wp)
{
return wp->safekeepers_generation != 0;
return wp->safekeepers_generation != INVALID_GENERATION;
}
/*
@@ -723,13 +728,176 @@ SendProposerGreeting(Safekeeper *sk)
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
/*
* Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in
* members_safekeepers & new_members_safekeepers to sk.
*/
static void
UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
/* members_safekeepers etc are fixed size, sanity check mconf size */
if (wp->mconf.members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len);
if (wp->mconf.new_members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many new_members %d in mconf", wp->mconf.new_members.len);
/* node id is not known until greeting is received */
if (sk->state < SS_WAIT_VOTING)
return;
/* 0 is assumed to be invalid node id, should never happen */
if (sk->greetResponse.nodeId == 0)
{
wp_log(WARNING, "safekeeper %s:%s sent zero node id", sk->host, sk->port);
return;
}
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
* latter always currently goes through restart though),
* ResetMemberSafekeeperPtrs is expected to be called before
* UpdateMemberSafekeeperPtr. So, other value suggests that we are
* connected to the same sk under different host name, complain
* about that.
*/
if (wp->members_safekeepers[i] != NULL && wp->members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->members_safekeepers[i] = sk;
}
}
/* repeat for new_members */
for (uint32 i = 0; i < wp->mconf.new_members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->new_members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->new_members_safekeepers[i] = sk;
}
}
}
/*
* Reset wp->members_safekeepers & new_members_safekeepers and refill them.
* Called after wp changes mconf.
*/
static void
ResetMemberSafekeeperPtrs(WalProposer *wp)
{
memset(&wp->members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
memset(&wp->new_members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].state >= SS_WAIT_VOTING)
UpdateMemberSafekeeperPtr(wp, &wp->safekeeper[i]);
}
}
static uint32
MsetQuorum(MemberSet *mset)
{
Assert(mset->len > 0);
return mset->len / 2 + 1;
}
/* Does n forms quorum in mset? */
static bool
MsetHasQuorum(MemberSet *mset, uint32 n)
{
return n >= MsetQuorum(mset);
}
/*
* TermsCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_greeted = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_VOTING)
{
if (n_greeted > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_greeted++;
}
}
appendStringInfo(s, ", %u/%u total", n_greeted, mset->len);
return MsetHasQuorum(mset, n_greeted);
}
/*
* Have we received greeting from enough (quorum) safekeepers to start voting?
*/
static bool
TermsCollected(WalProposer *wp)
{
return wp->n_connected >= wp->quorum;
StringInfoData s; /* str for logging */
bool collected = false;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
collected = wp->n_connected >= wp->quorum;
if (collected)
{
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT ", starting voting", wp->quorum, wp->propTerm);
}
return collected;
}
/*
* With generations enabled, we start campaign only when 1) some mconf is
* actually received 2) we have greetings from majority of members as well
* as from majority of new_members if it exists.
*/
if (wp->mconf.generation == INVALID_GENERATION)
return false;
initStringInfo(&s);
appendStringInfoString(&s, "mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum of safekeepers: %s, propTerm=" INT64_FORMAT ", starting voting", s.data, wp->propTerm);
collected = true;
res:
pfree(s.data);
return collected;
}
static void
@@ -753,13 +921,23 @@ RecvAcceptorGreeting(Safekeeper *sk)
pfree(mconf_toml);
/*
* Adopt mconf of safekeepers if it is higher. TODO: mconf change should
* restart wp if it started voting.
* Adopt mconf of safekeepers if it is higher.
*/
if (sk->greetResponse.mconf.generation > wp->mconf.generation)
{
/* sanity check before adopting, should never happen */
if (sk->greetResponse.mconf.members.len == 0)
{
wp_log(FATAL, "mconf %u has zero members", sk->greetResponse.mconf.generation);
}
/* TODO: put mconf to shmem to immediately pick it up on start */
if (wp->state >= WPS_CAMPAIGN)
{
wp_log(FATAL, "restarting to adopt mconf generation %d", sk->greetResponse.mconf.generation);
}
MembershipConfigurationFree(&wp->mconf);
MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf);
ResetMemberSafekeeperPtrs(wp);
/* full conf was just logged above */
wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation);
}
@@ -767,6 +945,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* Protocol is all good, move to voting. */
sk->state = SS_WAIT_VOTING;
/* In greeting safekeeper sent its id; update mappings accordingly. */
UpdateMemberSafekeeperPtr(wp, sk);
/*
* Note: it would be better to track the counter on per safekeeper basis,
* but at worst walproposer would restart with 'term rejected', so leave
@@ -778,12 +959,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* We're still collecting terms from the majority. */
wp->propTerm = Max(sk->greetResponse.term, wp->propTerm);
/* Quorum is acquried, prepare the vote request. */
/* Quorum is acquired, prepare the vote request. */
if (TermsCollected(wp))
{
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->state = WPS_CAMPAIGN;
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
@@ -899,6 +1077,44 @@ RecvVoteResponse(Safekeeper *sk)
}
}
/*
* VotesCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_votes = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_ELECTED)
{
if (GetLastLogTerm(sk) > wp->donorLastLogTerm ||
(GetLastLogTerm(sk) == wp->donorLastLogTerm &&
sk->voteResponse.flushLsn > wp->propTermStartLsn))
{
wp->donorLastLogTerm = GetLastLogTerm(sk);
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_votes++;
}
}
appendStringInfo(s, ", %u/%u total", n_votes, mset->len);
return MsetHasQuorum(mset, n_votes);
}
/*
* Checks if enough votes has been collected to get elected and if that's the
* case finds the highest vote, setting donor, donorLastLogTerm,
@@ -907,7 +1123,8 @@ RecvVoteResponse(Safekeeper *sk)
static bool
VotesCollected(WalProposer *wp)
{
int n_ready = 0;
StringInfoData s; /* str for logging */
bool collected = false;
/* assumed to be called only when not elected yet */
Assert(wp->state == WPS_CAMPAIGN);
@@ -916,25 +1133,61 @@ VotesCollected(WalProposer *wp)
wp->donorLastLogTerm = 0;
wp->truncateLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
{
n_ready++;
int n_ready = 0;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
{
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = i;
n_ready++;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
{
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = &wp->safekeeper[i];
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
}
collected = n_ready >= wp->quorum;
if (collected)
{
wp_log(LOG, "walproposer elected with %d/%d votes", n_ready, wp->n_safekeepers);
}
return collected;
}
return n_ready >= wp->quorum;
/*
* if generations are enabled we're expected to get to voting only when
* mconf is established.
*/
Assert(wp->mconf.generation != INVALID_GENERATION);
/*
* We must get votes from both msets if both are present.
*/
initStringInfo(&s);
appendStringInfoString(&s, "mset votes: ");
if (!VotesCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset votes: ");
if (!VotesCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp_log(LOG, "walproposer elected, %s", s.data);
collected = true;
res:
pfree(s.data);
return collected;
}
/*
@@ -955,7 +1208,7 @@ HandleElectedProposer(WalProposer *wp)
* that only for logical replication (and switching logical walsenders to
* neon_walreader is a todo.)
*/
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
if (!wp->api.recovery_download(wp, wp->donor))
{
wp_log(FATAL, "failed to download WAL for logical replicaiton");
}
@@ -1078,7 +1331,7 @@ ProcessPropStartPos(WalProposer *wp)
/*
* Proposer's term history is the donor's + its own entry.
*/
dth = &wp->safekeeper[wp->donor].voteResponse.termHistory;
dth = &wp->donor->voteResponse.termHistory;
wp->propTermHistory.n_entries = dth->n_entries + 1;
wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries);
if (dth->n_entries > 0)
@@ -1086,11 +1339,10 @@ ProcessPropStartPos(WalProposer *wp)
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn;
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp_log(LOG, "walproposer elected in term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->propTerm,
LSN_FORMAT_ARGS(wp->propTermStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
wp->donor->host, wp->donor->port,
LSN_FORMAT_ARGS(wp->truncateLsn));
/*
@@ -1508,6 +1760,14 @@ RecvAppendResponses(Safekeeper *sk)
readAnything = true;
/* should never happen: sk is expected to send ERROR instead */
if (sk->appendResponse.generation != wp->mconf.generation)
{
wp_log(FATAL, "safekeeper {id = %lu, ep = %s:%s} sent response with generation %u, expected %u",
sk->greetResponse.nodeId, sk->host, sk->port,
sk->appendResponse.generation, wp->mconf.generation);
}
if (sk->appendResponse.term > wp->propTerm)
{
/*
@@ -1624,30 +1884,100 @@ CalculateMinFlushLsn(WalProposer *wp)
}
/*
* Calculate WAL position acknowledged by quorum
* GetAcknowledgedByQuorumWALPosition for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static XLogRecPtr
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
GetCommittedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Sort acknowledged LSNs
* Ascending sort acknowledged LSNs.
*/
for (int i = 0; i < wp->n_safekeepers; i++)
Assert(mset->len <= MAX_SAFEKEEPERS);
for (uint32 i = 0; i < mset->len; i++)
{
Safekeeper *sk = msk[i];
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to epochStartLsn.
* terms, so ignore reported LSN until it gets to propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack flushLsn is
* 0, and later we just preserve value across reconnections. It would
* be ok to check for SS_ACTIVE as well.
*/
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
if (sk != NULL && sk->appendResponse.flushLsn >= wp->propTermStartLsn)
{
responses[i] = sk->appendResponse.flushLsn;
}
else
{
responses[i] = 0;
}
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
qsort(responses, mset->len, sizeof(XLogRecPtr), CompareLsn);
/*
* Get the smallest LSN committed by quorum
* And get value committed by the quorum. A way to view this: to get the
* highest value committed on the quorum, in the ordered array we skip n -
* n_quorum elements to get to the first (lowest) value present on all sks
* of the highest quorum.
*/
return responses[wp->n_safekeepers - wp->quorum];
return responses[mset->len - MsetQuorum(mset)];
}
/*
* Calculate WAL position acknowledged by quorum, i.e. which may be regarded
* committed.
*
* Zero may be returned when there is no quorum of nodes recovered to term start
* lsn which sent feedback yet.
*/
static XLogRecPtr
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
{
XLogRecPtr committed;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Sort acknowledged LSNs
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack
* flushLsn is 0, and later we just preserve value across
* reconnections. It would be ok to check for SS_ACTIVE as well.
*/
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
/*
* Get the smallest LSN committed by quorum
*/
return responses[wp->n_safekeepers - wp->quorum];
}
committed = GetCommittedMset(wp, &wp->mconf.members, wp->members_safekeepers);
if (wp->mconf.new_members.len > 0)
{
XLogRecPtr new_mset_committed = GetCommittedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers);
committed = Min(committed, new_mset_committed);
}
return committed;
}
/*
@@ -1673,9 +2003,9 @@ UpdateDonorShmem(WalProposer *wp)
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED)
if (wp->donor->state >= SS_WAIT_ELECTED)
{
donor = &wp->safekeeper[wp->donor];
donor = wp->donor;
donor_lsn = wp->propTermStartLsn;
}

View File

@@ -145,6 +145,7 @@ typedef uint64 NNodeId;
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
#define INVALID_GENERATION 0
typedef struct SafekeeperId
{
@@ -771,7 +772,17 @@ typedef struct WalProposer
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/* (n_safekeepers / 2) + 1 */
/*
* Parallels mconf.members with pointers to the member's slot in
* safekeepers array of connections, or NULL if such member is not
* connected. Helps to avoid looking slot per id through all
* .safekeepers[] when doing quorum checks.
*/
Safekeeper *members_safekeepers[MAX_SAFEKEEPERS];
/* As above, but for new_members. */
Safekeeper *new_members_safekeepers[MAX_SAFEKEEPERS];
/* (n_safekeepers / 2) + 1. Used for static pre-generations quorum checks. */
int quorum;
/*
@@ -829,7 +840,7 @@ typedef struct WalProposer
term_t donorLastLogTerm;
/* Most advanced acceptor */
int donor;
Safekeeper *donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;

40
poetry.lock generated
View File

@@ -3111,30 +3111,30 @@ six = "*"
[[package]]
name = "ruff"
version = "0.7.0"
version = "0.11.2"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.7.0-py3-none-linux_armv6l.whl", hash = "sha256:0cdf20c2b6ff98e37df47b2b0bd3a34aaa155f59a11182c1303cce79be715628"},
{file = "ruff-0.7.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:496494d350c7fdeb36ca4ef1c9f21d80d182423718782222c29b3e72b3512737"},
{file = "ruff-0.7.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:214b88498684e20b6b2b8852c01d50f0651f3cc6118dfa113b4def9f14faaf06"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:630fce3fefe9844e91ea5bbf7ceadab4f9981f42b704fae011bb8efcaf5d84be"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:211d877674e9373d4bb0f1c80f97a0201c61bcd1e9d045b6e9726adc42c156aa"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:194d6c46c98c73949a106425ed40a576f52291c12bc21399eb8f13a0f7073495"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:82c2579b82b9973a110fab281860403b397c08c403de92de19568f32f7178598"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9af971fe85dcd5eaed8f585ddbc6bdbe8c217fb8fcf510ea6bca5bdfff56040e"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b641c7f16939b7d24b7bfc0be4102c56562a18281f84f635604e8a6989948914"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d71672336e46b34e0c90a790afeac8a31954fd42872c1f6adaea1dff76fd44f9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ab7d98c7eed355166f367597e513a6c82408df4181a937628dbec79abb2a1fe4"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:1eb54986f770f49edb14f71d33312d79e00e629a57387382200b1ef12d6a4ef9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:dc452ba6f2bb9cf8726a84aa877061a2462afe9ae0ea1d411c53d226661c601d"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:4b406c2dce5be9bad59f2de26139a86017a517e6bcd2688da515481c05a2cb11"},
{file = "ruff-0.7.0-py3-none-win32.whl", hash = "sha256:f6c968509f767776f524a8430426539587d5ec5c662f6addb6aa25bc2e8195ec"},
{file = "ruff-0.7.0-py3-none-win_amd64.whl", hash = "sha256:ff4aabfbaaba880e85d394603b9e75d32b0693152e16fa659a3064a85df7fce2"},
{file = "ruff-0.7.0-py3-none-win_arm64.whl", hash = "sha256:10842f69c245e78d6adec7e1db0a7d9ddc2fff0621d730e61657b64fa36f207e"},
{file = "ruff-0.7.0.tar.gz", hash = "sha256:47a86360cf62d9cd53ebfb0b5eb0e882193fc191c6d717e8bef4462bc3b9ea2b"},
{file = "ruff-0.11.2-py3-none-linux_armv6l.whl", hash = "sha256:c69e20ea49e973f3afec2c06376eb56045709f0212615c1adb0eda35e8a4e477"},
{file = "ruff-0.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:2c5424cc1c4eb1d8ecabe6d4f1b70470b4f24a0c0171356290b1953ad8f0e272"},
{file = "ruff-0.11.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:ecf20854cc73f42171eedb66f006a43d0a21bfb98a2523a809931cda569552d9"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0c543bf65d5d27240321604cee0633a70c6c25c9a2f2492efa9f6d4b8e4199bb"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:20967168cc21195db5830b9224be0e964cc9c8ecf3b5a9e3ce19876e8d3a96e3"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:955a9ce63483999d9f0b8f0b4a3ad669e53484232853054cc8b9d51ab4c5de74"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:86b3a27c38b8fce73bcd262b0de32e9a6801b76d52cdb3ae4c914515f0cef608"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a3b66a03b248c9fcd9d64d445bafdf1589326bee6fc5c8e92d7562e58883e30f"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0397c2672db015be5aa3d4dac54c69aa012429097ff219392c018e21f5085147"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:869bcf3f9abf6457fbe39b5a37333aa4eecc52a3b99c98827ccc371a8e5b6f1b"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2a2b50ca35457ba785cd8c93ebbe529467594087b527a08d487cf0ee7b3087e9"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:7c69c74bf53ddcfbc22e6eb2f31211df7f65054bfc1f72288fc71e5f82db3eab"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6e8fb75e14560f7cf53b15bbc55baf5ecbe373dd5f3aab96ff7aa7777edd7630"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:842a472d7b4d6f5924e9297aa38149e5dcb1e628773b70e6387ae2c97a63c58f"},
{file = "ruff-0.11.2-py3-none-win32.whl", hash = "sha256:aca01ccd0eb5eb7156b324cfaa088586f06a86d9e5314b0eb330cb48415097cc"},
{file = "ruff-0.11.2-py3-none-win_amd64.whl", hash = "sha256:3170150172a8f994136c0c66f494edf199a0bbea7a409f649e4bc8f4d7084080"},
{file = "ruff-0.11.2-py3-none-win_arm64.whl", hash = "sha256:52933095158ff328f4c77af3d74f0379e34fd52f175144cefc1b192e7ccd32b4"},
{file = "ruff-0.11.2.tar.gz", hash = "sha256:ec47591497d5a1050175bdf4e1a4e6272cddff7da88a2ad595e1e326041d8d94"},
]
[[package]]
@@ -3844,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "715fc8c896dcfa1b15054deeddcdec557ef93af91b26e1c8e4688fe4dbef5296"
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"

View File

@@ -314,9 +314,9 @@ pub async fn run() -> anyhow::Result<()> {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => Some(
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
),
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(

View File

@@ -1,5 +1,6 @@
//! Mock console backend which relies on a user-provided postgres instance.
use std::io;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::sync::Arc;
@@ -22,7 +23,6 @@ use crate::control_plane::errors::{
};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::{AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
use crate::error::io_error;
use crate::intern::RoleNameInt;
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
use crate::url::ApiUrl;
@@ -36,13 +36,13 @@ enum MockApiError {
impl From<MockApiError> for ControlPlaneError {
fn from(e: MockApiError) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}
impl From<tokio_postgres::Error> for ControlPlaneError {
fn from(e: tokio_postgres::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}

View File

@@ -1,8 +1,10 @@
use std::io;
use thiserror::Error;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason};
use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error};
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
@@ -79,13 +81,13 @@ impl CouldRetry for ControlPlaneError {
impl From<reqwest::Error> for ControlPlaneError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}
impl From<reqwest_middleware::Error> for ControlPlaneError {
fn from(e: reqwest_middleware::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}

View File

@@ -1,15 +1,9 @@
use std::error::Error as StdError;
use std::{fmt, io};
use std::fmt;
use anyhow::Context;
use measured::FixedCardinalityLabel;
use tokio::task::JoinError;
/// Upcast (almost) any error into an opaque [`io::Error`].
pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// Marks errors that may be safely shown to a client.
/// This trait can be seen as a specialized version of [`ToString`].
///

View File

@@ -163,8 +163,7 @@ fn process_proxy_payload(
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
#[rustfmt::skip] // https://github.com/rust-lang/rustfmt/issues/6384
_ => return Err(io::Error::new(
io::ErrorKind::Other,
_ => return Err(io::Error::other(
format!(
"invalid proxy protocol command 0x{:02X}. expected local (0x20) or proxy (0x21)",
header.version_and_command
@@ -178,21 +177,20 @@ fn process_proxy_payload(
TCP_OVER_IPV4 | UDP_OVER_IPV4 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV4>()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
.ok_or_else(|| io::Error::other(size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
TCP_OVER_IPV6 | UDP_OVER_IPV6 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV6>()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
.ok_or_else(|| io::Error::other(size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
// unspecified or unix stream. ignore the addresses
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(io::Error::other(
"invalid proxy protocol address family/transport protocol.",
));
}

View File

@@ -143,6 +143,8 @@ impl ConnectionWithCredentialsProvider {
db: 0,
username: Some(username),
password: Some(password.clone()),
// TODO: switch to RESP3 after testing new client version.
protocol: redis::ProtocolVersion::RESP2,
},
})
}

View File

@@ -19,7 +19,7 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
Value::String(s) => Some(s.clone()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),

View File

@@ -866,7 +866,7 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let res = match select(
match select(
pin!(query_to_json(
config,
&mut *inner,
@@ -889,7 +889,7 @@ impl QueryData {
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
return Err(e);
Err(e)
}
// The query was cancelled.
Either::Right((_cancelled, query)) => {
@@ -930,8 +930,7 @@ impl QueryData {
}
}
}
};
res
}
}
}

View File

@@ -15,7 +15,7 @@ use tracing::warn;
use crate::cancellation::CancellationHandler;
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ReportableError, io_error};
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::{ClientMode, ErrorSource, handle_client};
use crate::rate_limiter::EndpointRateLimiter;
@@ -50,23 +50,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
let this = self.project();
let mut stream = this.stream;
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
ready!(stream.as_mut().poll_ready(cx).map_err(io::Error::other))?;
this.send.put(buf);
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(io_error(e))),
Err(e) => Poll::Ready(Err(io::Error::other(e))),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_flush(cx).map_err(io_error)
stream.poll_flush(cx).map_err(io::Error::other)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_close(cx).map_err(io_error)
stream.poll_close(cx).map_err(io::Error::other)
}
}
@@ -97,7 +97,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
}
let res = ready!(this.stream.as_mut().poll_next(cx));
match res.transpose().map_err(io_error)? {
match res.transpose().map_err(io::Error::other)? {
Some(message) => match message.opcode {
OpCode::Ping => {}
OpCode::Pong => {}
@@ -105,7 +105,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
// We expect to see only binary messages.
let error = "unexpected text message in the websocket";
warn!(length = message.payload.len(), error);
return Poll::Ready(Err(io_error(error)));
return Poll::Ready(Err(io::Error::other(error)));
}
OpCode::Binary | OpCode::Continuation => {
debug_assert!(this.recv.is_empty());

View File

@@ -173,7 +173,7 @@ impl CertResolver {
}
pub fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
self.certs.keys().cloned().collect()
}
}

View File

@@ -53,7 +53,7 @@ jsonnet = "^0.21.0-rc2"
[tool.poetry.group.dev.dependencies]
mypy = "==1.13.0"
ruff = "^0.7.0"
ruff = "^0.11.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
@@ -109,4 +109,5 @@ select = [
"W", # pycodestyle
"B", # bugbear
"UP", # pyupgrade
"TC", # flake8-type-checking
]

View File

@@ -38,9 +38,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
/// Failed to create client.
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -120,6 +119,12 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self.request(Method::DELETE, &uri, ()).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn bump_timeline_term(
&self,
tenant_id: TenantId,

View File

@@ -21,7 +21,7 @@ use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_KEY_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::{
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
@@ -214,7 +214,10 @@ struct Args {
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Trusted root CA certificate to use in https APIs.
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
}
@@ -350,13 +353,13 @@ async fn main() -> anyhow::Result<()> {
}
};
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
};
let conf = Arc::new(SafeKeeperConf {
@@ -394,7 +397,8 @@ async fn main() -> anyhow::Result<()> {
max_delta_for_fanout: args.max_delta_for_fanout,
ssl_key_file: args.ssl_key_file,
ssl_cert_file: args.ssl_cert_file,
ssl_ca_cert,
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
});
// initialize sentry if SENTRY_DSN is provided

View File

@@ -1,6 +1,7 @@
pub mod routes;
use std::sync::Arc;
use http_utils::tls_certs::ReloadingCertificateResolver;
pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
@@ -29,12 +30,16 @@ pub async fn task_main_https(
https_listener: std::net::TcpListener,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
let cert_resolver = ReloadingCertificateResolver::new(
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
)
.await?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)?;
.with_cert_resolver(cert_resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));

View File

@@ -235,7 +235,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp = pull_timeline::handle_request(
data,
conf.sk_auth_token.clone(),
conf.ssl_ca_cert.clone(),
conf.ssl_ca_certs.clone(),
global_timelines,
)
.await

View File

@@ -73,6 +73,7 @@ pub mod defaults {
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
}
#[derive(Debug, Clone)]
@@ -118,7 +119,8 @@ pub struct SafeKeeperConf {
pub max_delta_for_fanout: Option<u64>,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
pub ssl_ca_cert: Option<Certificate>,
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Certificate>,
}
impl SafeKeeperConf {
@@ -166,7 +168,8 @@ impl SafeKeeperConf {
max_delta_for_fanout: None,
ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
ssl_ca_cert: None,
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
}
}
}

View File

@@ -393,7 +393,7 @@ pub struct DebugDumpResponse {
pub async fn handle_request(
request: PullTimelineRequest,
sk_auth_token: Option<SecretString>,
ssl_ca_cert: Option<Certificate>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
@@ -405,7 +405,7 @@ pub async fn handle_request(
}
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;

View File

@@ -182,7 +182,8 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
max_delta_for_fanout: None,
ssl_key_file: Utf8PathBuf::from(""),
ssl_cert_file: Utf8PathBuf::from(""),
ssl_ca_cert: None,
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -8,9 +8,12 @@
from __future__ import annotations
import argparse
from typing import TYPE_CHECKING
import psycopg2
from psycopg2.extensions import connection as PgConnection
if TYPE_CHECKING:
from psycopg2.extensions import connection as PgConnection
def main(args: argparse.Namespace):

View File

@@ -7,13 +7,13 @@ import logging
import signal
import sys
from collections import defaultdict
from collections.abc import Awaitable
from dataclasses import dataclass
from typing import TYPE_CHECKING
import aiohttp
if TYPE_CHECKING:
from collections.abc import Awaitable
from typing import Any

View File

@@ -16,10 +16,11 @@ testing = []
[dependencies]
anyhow.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
cron.workspace = true
clashmap.workspace = true
cron.workspace = true
fail.workspace = true
futures.workspace = true
governor.workspace = true
@@ -44,8 +45,9 @@ rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
measured.workspace = true
rustls.workspace = true

View File

@@ -624,16 +624,19 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -8,7 +8,6 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
use pageserver_api::models::PageserverUtilization;
use reqwest::Certificate;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
@@ -27,8 +26,8 @@ struct HeartbeaterTask<Server, State> {
max_offline_interval: Duration,
max_warming_up_interval: Duration,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
}
#[derive(Debug, Clone)]
@@ -76,8 +75,8 @@ where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
pub(crate) fn new(
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -86,8 +85,8 @@ where
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
http_client,
jwt_token,
ssl_ca_cert,
max_offline_interval,
max_warming_up_interval,
cancel,
@@ -122,8 +121,8 @@ where
{
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -134,8 +133,8 @@ where
state: HashMap::new(),
max_offline_interval,
max_warming_up_interval,
http_client,
jwt_token,
ssl_ca_cert,
}
}
async fn run(&mut self) {
@@ -178,7 +177,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let ssl_ca_cert = self.ssl_ca_cert.clone();
let http_client = self.http_client.clone();
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
@@ -193,8 +192,8 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let response = node_clone
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),
@@ -329,19 +328,19 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
continue;
}
heartbeat_futs.push({
let http_client = self.http_client.clone();
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let ssl_ca_cert = self.ssl_ca_cert.clone();
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),

View File

@@ -24,9 +24,9 @@ use pageserver_api::controller_api::{
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
};
use pageserver_api::models::{
DetachBehavior, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
TimelineCreateRequest,
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -582,6 +582,32 @@ async fn handle_tenant_timeline_download_heatmap_layers(
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_lsn_lease(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
service
.tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
.await?;
json_response(StatusCode::OK, ())
}
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
// compare to, so we can just filter out our well known ID format with regexes.
@@ -656,11 +682,10 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(
service.get_http_client().clone(),
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
service.get_config().ssl_ca_cert.clone(),
)
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
);
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
@@ -2193,6 +2218,17 @@ pub fn make_router(
)
},
)
// LSN lease passthrough to all shards
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_lsn_lease,
RequestName("v1_tenant_timeline_lsn_lease"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

Some files were not shown because too many files have changed in this diff Show More