Compare commits

..

40 Commits

Author SHA1 Message Date
Folke Behrens
4f88c4b8f3 proxy: introduce Acceptor and Connector traits 2025-01-03 15:53:39 +01:00
John Spray
c08759f367 storcon: verbose logs in rare case of shards not attached yet (#10262)
## Problem

When we do a timeline CRUD operation, we check that the shards we need
to mutate are currently attached to a pageserver, by reading
`generation` and `generation_pageserver` from the database.

If any don't appear to be attached, we respond with a a 503 and "One or
more shards in tenant is not yet attached".

This is happening more often than expected, and it's not obvious with
current logging what's going on: specifically which shard has a problem,
and exactly what we're seeing in these persistent generation columns.

(Aside: it's possible that we broke something with the change in #10011
which clears generation_pageserver when we detach a shard, although if
so the mechanism isn't trivial: what should happen is that if we stamp
on generation_pageserver if a reconciler is running, then it shouldn't
matter because we're about to

## Summary of changes

- When we are in Attached mode but find that
generation_pageserver/generation are unset, output details while looping
over shards.
2025-01-03 10:55:15 +00:00
John Spray
ba9722a2fd tests: add upload wait in test_scrubber_physical_gc_ancestors (#10260)
## Problem

We see periodic failures in `test_scrubber_physical_gc_ancestors`, where
the logs show that the pageserver is creating image layers that should
cause child shards to no longer reference their parents' layers, but
then the scrubber runs and doesn't find any unreferenced layers.[


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-10256/12582034135/index.html#/testresult/78ea06dea6ba8dd3

From inspecting the code & test, it seems like this could be as simple
as the test failing to wait for uploads before running the scrubber. It
had a 2 second delay built in to satisfy the scrubbers time threshold
checks, which on a lightly loaded machine would also have been easily
enough for uploads to complete, but our test machines are more heavily
loaded all the time.

## Summary of changes

- Wait for uploads to complete after generating images layers in
test_scrubber_physical_gc_ancestors, so that the scrubber should
reliably see the post-compaction metadata.
2025-01-03 10:55:07 +00:00
John Spray
2d4f267983 cargo: update diesel, pq-sys (#10256)
## Problem

Versions of `diesel` and `pq-sys` were somewhat stale. I was checking on
libpq->openssl versions while investigating a segfault via
https://github.com/neondatabase/cloud/issues/21010. I don't think these
rust bindings are likely to be the source of issues, but we might as
well freshen them as a precaution.

## Summary of changes

- Update diesel to 2.2.6
- Update pq-sys to 0.6.3
2025-01-03 10:20:18 +00:00
Ivan Efremov
7a598b9842 [proxy/docs]imprv: Add local testing section to proxy README (#10230)
Add commands to run proxy locally with the mocked control plane
2025-01-03 10:04:58 +00:00
Tristan Partin
eefad27538 Inline various migration queries (#10231)
There was no value in saving them off to temporary variables.

Signed-off-by: Tristan Partin <tristan@neon.tech>

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-01-02 22:12:56 +00:00
Em Sharnoff
cd10c719f9 compute: Add spec support for disabling LFC resizing (#10132)
ref neondatabase/cloud#21731

## Problem

When we manually override the LFC size for particular computes,
autoscaling will typically undo that because vm-monitor will resize LFC
itself.

So, we'd like a way to make vm-monitor not set LFC size — this actually
already exists, if we just don't give vm-monitor a postgres connection
string.

## Summary of changes

Add a new field to the compute spec, `disable_lfc_resizing`. When set to
`true`, we pass in `None` for its postgres connection string. That
matches the configuration tested in `neondatabase/autoscaling` CI.
2025-01-02 19:45:59 +00:00
Tristan Partin
363ea97f69 Add more substantial tests for compute migrations (#9811)
The previous tests really didn't do much. This set should be quite a bit
more encompassing.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-01-02 18:37:50 +00:00
Conrad Ludgate
56e6ebfe17 chore: building compute_tools and local_proxy together (#10257)
## Problem

Building local_proxy and compute_tools features the same dependency
tree, but as they are currently built in separate clean layers all that
progress is wasted. For our arm builds that's an extra 10 minutes.

## Summary of changes

Combines the compute_tools and local_proxy build layers.
2025-01-02 16:05:14 +00:00
Raphael 'kena' Poss
1622fd8bda proxy: recognize but ignore the 3 new redis message types (#10197)
## Problem

https://neondb.slack.com/archives/C085MBDUSS2/p1734604792755369

## Summary of changes

Recognize and ignore the 3 new broadcast messages:
- `/block_public_or_vpc_access_updated`
- `/allowed_vpc_endpoints_updated_for_org`
- `/allowed_vpc_endpoints_updated_for_projects`
2025-01-02 16:02:48 +00:00
Konstantin Knizhnik
8c7dcd2598 Set heartbeat interval for chaos test (#10222)
## Problem

See https://neondb.slack.com/archives/C033RQ5SPDH/p1734707873215729

test_timeline_archival_chaos becomes more flaky with increased heartbeat
interval

Resolves #10250.

## Summary of changes

Override heatbeat interval for `test_timelirn_archival_chaos.py`

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-01-02 14:14:18 +00:00
Folke Behrens
ee22d4c9ef proxy: Set TCP_NODELAY for compute connections (#10240)
neondatabase/cloud#19184
2025-01-02 13:32:24 +00:00
JC Grünhage
26600f2973 Skip running clippy without default features (#10098)
## Problem

Running clippy with `cargo hack --feature-powerset` in CI isn't
particularly fast. This PR follows-up on
https://github.com/neondatabase/neon/pull/8912 to improve the speed of
our clippy runs.

Parallelism as suggested in
https://github.com/neondatabase/neon/issues/9901 was tested, but didn't
show consistent enough improvements to be worth it. It actually
increased the amount of work done, as there's less cache hits when
clippy runs are spread out over multiple target directories.
Additionally, parallelism makes it so caching needs to be thought about
more actively and copying around target directories to enable
parallelism eats the rest of the performance gains from parallel
execution.

After some discussion, the decision was to instead cut down on the
number of jobs that are running further. The easiest way to do this is
to not run clippy *without* default features. The list of default
features is empty for all crates, and I haven't found anything using
`cfg(feature = "default")` either, so this is likely not going to change
anything except speeding the runs up.

## Summary of changes

Reduce the amount of feature combinations tried by `cargo hack` (as
suggested in
https://github.com/neondatabase/neon/pull/8912#pullrequestreview-2286482368)
by never disabling default features.

## Alternatives

- We can split things out into different jobs which reduces the time
until everything is finished by running more things in parallel. This
does however decreases the amount of cache hits and increases the amount
of time spent on overhead tasks like repo cloning and restoring caches
by doing those multiple times instead of once.
- We could replace `cargo hack [...] clippy` with `cargo clippy [...];
cargo clippy --features testing`. I'm not 100% sure how this compares to
the change here in the PR, but it does seem to run a bit faster. That
likely means it's doing less work, but without understanding what
exactly we loose by that I'd rather not do that for now. I'd appreciate
input on this though.
2025-01-02 11:33:42 +00:00
Konstantin Knizhnik
b3cd883f93 Unlock LFC mutex when LFC cache is disabled (#10235)
## Problem

See https://github.com/neondatabase/neon/issues/10233
`lfc_containsv` returns with holding lock when LFC was disabled.
This bug was introduced in commit  78938d1b59

## Summary of changes

Release lock before return.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-01-02 11:28:15 +00:00
Conrad Ludgate
38c7a2abfc chore(proxy): pre-load native tls certificates and propagate compute client config (#10182)
Now that we construct the TLS client config for cancellation as well as
connect, it feels appropriate to construct the same config once and
re-use it elsewhere. It might also help should #7500 require any extra
setup, so we can easily add it to all the appropriate call sites.
2025-01-02 09:36:13 +00:00
Conrad Ludgate
f94248a594 chore(libs/proxy): refactor tokio-postgres connection control flow (#10247)
In #10207 it was clear there was some confusion with the current
connection logic. To analyse the flow to make sure there was no poll
stalling, I ended up with the following refactor.

Notable changes:
1. Now all functions called `poll_xyz` and that have a `cx: &mut
Context` argument must return a `Poll<_>` type, and can only return
`Pending` iff an internal poll call also returned `Pending`
2. State management is handled entirely by `poll_messages`. There are
now only 2 states which makes it much easier to keep track of.

Each commit should be self-reviewable and should be simple to verify
that it keeps the same behaviour
2025-01-02 09:35:28 +00:00
Alex Chi Z.
9c53b41245 fix(pageserver): update remote latest_gc_cutoff after gc-compaction (#10209)
## Problem

close https://github.com/neondatabase/neon/issues/10208
part of #9114 

## Summary of changes

* Ensure remote `latest_gc_cutoff` is up-to-date before removing any
files for gc-compaction.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-19 18:40:20 +00:00
Konstantin Knizhnik
197a89ab3d Increase default stotrage controller heartbeat interval from 100msec … (#10206)
## Problem

Currently default value of storage controller heartbeat interval is
100msec. It means that 10 times per second it establish connection to
PS. And it seems to be quite expensive.
At MacOS right now storage_controller consumes 70% CPU and trusts - 30%.
So together they completely utilize one core.
A lot of us has Macs. Let's save environment a little bit and do not
waste electricity and contribute to global warming.

By the way, on prod we have interval  10seconds 

## Summary of changes

Increase heartbeat interval from 100msec to 1 second.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-19 18:32:32 +00:00
Alex Chi Z.
b89e02f3e8 fix(pageserver): consider partial compaction layer map in layer check (#10044)
## Problem

In https://github.com/neondatabase/neon/pull/9897 we temporarily
disabled the layer valid check because the current one only considers
the end result of all compaction algorithms, but partial gc-compaction
would temporarily produce an "invalid" layer map.

part of https://github.com/neondatabase/neon/issues/9114

## Summary of changes

Allow LSN splits to overlap in the slow path check. Currently, the valid
check is only used in storage scrubber (background job) and during
gc-compaction (without taking layer lock). Therefore, it's fine for such
checks to be a little bit inefficient but more accurate.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-12-19 18:04:53 +00:00
Konstantin Knizhnik
04517c6ff3 Do not reload config file on PS reconnect (#10204)
## Problem

See https://github.com/neondatabase/neon/issues/10184
and
https://neondb.slack.com/archives/C04DGM6SMTM/p1733997259898819

Reloading config file inside parallel worker cause it's termination

## Summary of changes

Remove call of `HandleMainLoopInterrupts()` 
Update of page server URL is propagated by postmaster through shared
memory and we should not reload config for it.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-19 15:22:39 +00:00
Vlad Lazar
628451d68e safekeeper: short-circuit interpreted wal sender (#10202)
## Problem

Safekeeper may currently send a batch to the pageserver even if it
hasn't decoded a new record.
I think this is quite unlikely in the field, but worth adressing.

## Summary of changes

Don't send anything if we haven't decoded a full record. Once this
merges and releases, the `InterpretedWalRecords` struct can be updated
to remove the Option wrapper for `next_record_lsn`.
2024-12-19 14:04:46 +00:00
Vlad Lazar
502d512fe2 safekeeper: lift benchmarking utils into safekeeper crate (#10200)
## Problem

The benchmarking utilities are also useful for testing. We want to write
tests in the safekeeper crate.

## Summary of changes

This commit lifts the utils to the safekeeper crate. They are compiled
if the benchmarking features is enabled or if in test mode.
2024-12-19 14:04:42 +00:00
John Spray
afda6d4700 storage_scrubber: don't report half-created timelines as corruption (#10198)
## Problem

test_timeline_archival_chaos does timeline creation with failure
injection, and thereby sometimes leaves timelines in a part created
state. This was being reported as corruption by the scrubber on test
teardown, because it considered a layer without an index to be an
invalid state. This was incorrect: the scrubber should accept this
state, it occurs legitimately during timeline creation.

Closes: https://github.com/neondatabase/neon/issues/9988

## Summary of changes

- Report a timeline with layers but no index as Relic rather than
MissingIndexPart.
- We retain the MissingIndexPart variant for the case where an index
_was_ found in the listing, but was not found by a subsequent GET, i.e.
racing with deletion.
2024-12-19 12:55:05 +00:00
John Spray
65042cbadd tests: use high IO concurrency in test_pgdata_import_smoke, use effective_io_concurrency=2 in tests by default (#10114)
## Problem

`test_pgdata_import_smoke` writes two gigabytes of pages and then reads
them back serially. This is CPU bottlenecked and results in a long
runtime, and sensitivity to CPU load from other tests on the same
machine.

Closes: https://github.com/neondatabase/neon/issues/10071

## Summary of changes

- Use effective_io_concurrency=32 when doing sequential scans through
2GiB of pages in test_pgdata_import_smoke. This is a ~10x runtime
decrease in the parts of the test that do sequential scans.
- Also set `effective_io_concurrency=2` for tests, as I noticed while
debugging that we were doing all getpage requests serially, which is bad
for checking the stability of the batching code.
2024-12-19 10:58:49 +00:00
Folke Behrens
b135194090 proxy: Delay SASL complete message until auth is done (#10189)
The final SASL complete message can be bundled with the remainder of the
auth flow messages until ReadyForQuery.

neondatabase/cloud#19184
2024-12-19 10:37:08 +00:00
Peter Bendel
43dc03459d Run pgbench on 10 GB scale factor on database with n relations (e.g. 10k) (#10172)
## Problem

We want to verify how much / if pgbench throughput and latency on Neon
suffers if the database contains many other relations, too.

## Summary of changes

Modify the benchmarking.yml pgbench-compare job to
- create an addiitional project at scale factor 10 GiB
- before running pgbench add n tables (initially 10k) to the database
- then compare the pgbench throughput and latency to the existing
pgbench-compare at 10 Gib scale factor

We use a realistic template for the n relations that is a partitioned
table with some realistic data types, indexes and constraints - similar
to a table that we use internally.

Example run:
https://github.com/neondatabase/neon/actions/runs/12377565956/job/34547386959
2024-12-19 10:25:44 +00:00
Christian Schwarz
a1b0558493 fast import: importer: use aws s3 cli (#10162)
## Problem

s5cmd doesn't pick up the pod service account

```
2024/12/16 16:26:01 Ignoring, HTTP credential provider invalid endpoint host, "169.254.170.23", only loopback hosts are allowed. <nil>
ERROR "ls s3://neon-dev-bulk-import-us-east-2/import-pgdata/fast-import/v1/br-wandering-hall-w2xobawv": NoCredentialProviders: no valid providers in chain. Deprecated. For verbose messaging see aws.Config.CredentialsChainVerboseErrors
```

## Summary of changes

Switch to offical CLI.


## Testing

Tested the pre-merge image in staging, using `job_image` override in
project settings.


https://neondb.slack.com/archives/C033RQ5SPDH/p1734554944391949?thread_ts=1734368383.258759&cid=C033RQ5SPDH

## Future Work

Switch back to s5cmd once https://github.com/peak/s5cmd/pull/769 gets
merged.

## Refs

- fixes https://github.com/neondatabase/cloud/issues/21876

---------

Co-authored-by: Gleb Novikov <NanoBjorn@users.noreply.github.com>
2024-12-19 10:04:17 +00:00
Alex Chi Z.
cc138b56f9 fix(pageserver): run psql in thread to avoid blocking (#10177)
## Problem

ref https://github.com/neondatabase/neon/issues/10170
ref https://github.com/neondatabase/neon/issues/9994

The psql command will block the main thread, causing other async tasks
to timeout (i.e., HTTP connect). Therefore, we need to move it to an I/O
executor thread.

## Summary of changes

* run psql connection in a thread

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: John Spray <john@neon.tech>
2024-12-19 09:45:06 +00:00
Konstantin Knizhnik
61fcf64c22 Fix flukyness of test_physical_and_logical_replicaiton.py (#10176)
## Problem

See https://github.com/neondatabase/neon/issues/10037
test_physical_and_logical_replication.py sometimes failed.

## Summary of changes

Add `wait_replica_caughtup` to wait for replica sync

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-18 19:15:38 +00:00
Alex Chi Z.
6d3e8096fc refactor(test): tighten up test_gc_feedback (#10126)
## Problem

In https://github.com/neondatabase/neon/pull/8103 we changed the test
case to have more test coverage of gc_compaction. Now that we have
`test_gc_compaction_smoke`, we can revert this test case to serve its
original purpose and revert the parameter changes.

part of https://github.com/neondatabase/neon/issues/9114

## Summary of changes

* Revert pitr_interval from 60s to 10s.
* Assert the physical/logical size ratio in the benchmark.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-12-18 18:10:05 +00:00
Alex Chi Z.
3d1c3a80ae feat(pageserver): add compact queue http endpoint (#10173)
## Problem

We cannot get the size of the compaction queue and access the info.

Part of #9114 

## Summary of changes

* Add an API endpoint to get the compaction queue.
* gc_compaction test case now waits until the compaction finishes.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-18 18:09:02 +00:00
John Spray
835287ba3a neon_local: add a flock to protect against concurrent execution (#10185)
## Problem

`neon_local` has always been unsafe to run concurrently with itself: it
uses simple text files for persistent state, and concurrent runs will
step on each other.

In some test environments we intentionally handle this with mutexes in
python land, but it's fragile to try and always remember to do that.

## Summary of changes

- Add a `flock` based mutex around the `main` function of neon_local,
using the repo directory as the file to lock
- Clean up an Option<> around control_plane_api, this is a drive-by
change because it was one of the fields that had a weird effect when
previous concurrent stuff stamped on it.
2024-12-18 16:29:47 +00:00
Conrad Ludgate
d63602cc78 chore(proxy): fully remove allow-self-signed-compute flag (#10168)
When https://github.com/neondatabase/cloud/pull/21856 is merged, this
flag is no longer necessary.
2024-12-18 16:03:14 +00:00
Erik Grinaker
1668d39b7c safekeeper: fix typo in allowlist for /profile/heap (#10186) 2024-12-18 15:51:53 +00:00
Alex Chi Z.
1d12efc428 fix(pageserver): allow repartition errors during gc-compaction smoke tests (#10164)
## Problem

part of https://github.com/neondatabase/neon/issues/9114

In https://github.com/neondatabase/neon/pull/10127 we fixed the race,
but we didn't add the errors to the allowlist.

## Summary of changes

* Allow repartition errors in the gc-compaction smoke test.

I think it might be worth to refactor the code to allow multiple threads
getting a copy of repartition status (i.e., using Rcu) in the future.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-18 15:37:26 +00:00
Arpad Müller
85696297c5 Add safekeepers command to storcon_cli for listing (#10151)
Add a `safekeepers` subcommand to `storcon_cli` that allows listing the
safekeepers.

```
$ curl -X POST --url http://localhost:1234/control/v1/safekeeper/42 --data \
  '{"active":true, "id":42, "created_at":"2023-10-25T09:11:25Z", "updated_at":"2024-08-28T11:32:43Z","region_id":"neon_local","host":"localhost","port":5454,"http_port":0,"version":123,"availability_zone_id":"us-east-2b"}'
$ cargo run --bin storcon_cli  -- --api http://localhost:1234 safekeepers
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.38s
     Running `target/debug/storcon_cli --api 'http://localhost:1234' safekeepers`
+----+---------+-----------+------+-----------+------------+
| Id | Version | Host      | Port | Http Port | AZ Id      |
+==========================================================+
| 42 | 123     | localhost | 5454 | 0         | us-east-2b |
+----+---------+-----------+------+-----------+------------+
```

Also:

* Don't return the raw `SafekeeperPersistence` struct that contains the
raw database presentation, but instead a new
`SafekeeperDescribeResponse` struct.
* The `SafekeeperPersistence` struct leaves out the `active` field on
purpose because we want to deprecate it and replace it with a
`scheduling_policy` one.

Part of https://github.com/neondatabase/neon/issues/9981
2024-12-18 12:47:56 +00:00
Konstantin Knizhnik
aaf980f70d Online checkpoint replication state (#9976)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1733180965970089

Replication state is checkpointed only by shutdown checkpoint.
It means that replication snapshots are not removed till compute
shutdown.

## Summary of changes

Checkpoint replication state during online checkpoint

Related Postgres PR:
https://github.com/neondatabase/postgres/pull/546

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-18 09:34:38 +00:00
a-masterov
c52514ab02 Fix allure report creation on periodic pg_regress testing (#10171)
## Problem
The allure report finishes with the error `HttpError: Resource not
accessible by integration` while running the `pg_regress` test against a
cloud staging project due to a lack of permissions.
## Summary of changes
The permissions are added.
2024-12-17 20:47:44 +00:00
Conrad Ludgate
2ee6bc5ec4 chore(proxy): update vendored postgres libs to edition 2021 (#10139)
I ran `cargo fix --edition` in each project prior, and it found nothing
that needed fixing.
2024-12-17 20:06:18 +00:00
John Spray
fd230227f2 storcon: include preferred AZ in compute notifications (#9953)
## Problem

It is unreliable for the control plane to infer the AZ for computes from
where the tenant is currently attached, because if a tenant happens to
be in a degraded state or a release is ongoing while a compute starts,
then the tenant's attached AZ can be a different one to where it will
run long-term, and the control plane doesn't check back later to restart
the compute.

This can land in parallel with
https://github.com/neondatabase/neon/pull/9947

## Summary of changes

- Thread through the preferred AZ into the compute hook code via the
reconciler
- Include the preferred AZ in the body of compute hook notifications
2024-12-17 20:04:09 +00:00
119 changed files with 2272 additions and 1069 deletions

View File

@@ -169,7 +169,7 @@ runs:
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
cov_prefix=()
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
else
cov_prefix=()
fi

View File

@@ -90,7 +90,7 @@ jobs:
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix=""
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""

View File

@@ -308,6 +308,7 @@ jobs:
"image": [ "'"$image_default"'" ],
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
@@ -410,7 +411,7 @@ jobs:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -429,7 +430,7 @@ jobs:
neonvm-captest-sharding-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
;;
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
neonvm-captest-new | neonvm-captest-new-many-tables | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -446,6 +447,26 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
# we want to compare Neon project OLTP throughput and latency at scale factor 10 GB
# without (neonvm-captest-new)
# and with (neonvm-captest-new-many-tables) many relations in the database
- name: Create many relations before the run
if: contains(fromJson('["neonvm-captest-new-many-tables"]'), matrix.platform)
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_NUM_RELATIONS: 10000
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:

View File

@@ -212,7 +212,7 @@ jobs:
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
run: cargo hack --features default --ignore-unknown-features --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items

View File

@@ -21,6 +21,8 @@ concurrency:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
jobs:
regress:

9
Cargo.lock generated
View File

@@ -1274,6 +1274,7 @@ dependencies = [
"chrono",
"clap",
"compute_api",
"fail",
"flate2",
"futures",
"hyper 0.14.30",
@@ -1732,9 +1733,9 @@ checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c"
[[package]]
name = "diesel"
version = "2.2.3"
version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e13bab2796f412722112327f3e575601a3e9cdcbe426f0d30dbf43f3f5dc71"
checksum = "ccf1bedf64cdb9643204a36dd15b19a6ce8e7aa7f7b105868e9f1fad5ffa7d12"
dependencies = [
"bitflags 2.4.1",
"byteorder",
@@ -4493,9 +4494,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.4.8"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
dependencies = [
"vcpkg",
]

View File

@@ -1285,7 +1285,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
@@ -1295,7 +1295,7 @@ ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && mold -run cargo build --locked --profile release-line-debug-size-lto
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
#########################################################################################
#
@@ -1338,20 +1338,6 @@ RUN set -e \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=
#########################################################################################
#
# Compile the Neon-specific `local_proxy` binary
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS local_proxy
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin local_proxy
#########################################################################################
#
# Layers "postgres-exporter" and "sql-exporter"
@@ -1491,7 +1477,7 @@ COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
# local_proxy and its config
COPY --from=local_proxy --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
# Metrics exporter binaries and configuration files
@@ -1556,28 +1542,30 @@ RUN apt update && \
locales \
procps \
ca-certificates \
curl \
unzip \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
# used by fast_import
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
ARG TARGETARCH
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
RUN set -ex; \
\
# Determine the expected checksum based on TARGETARCH
if [ "${TARGETARCH}" = "amd64" ]; then \
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
\
# Compute and validate the checksum
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
true
ENV LANG=en_US.utf8
USER postgres

View File

@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
testing = ["fail/failpoints"]
[dependencies]
base64.workspace = true
@@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }

View File

@@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
let scenario = failpoint_support::init();
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
@@ -100,6 +103,8 @@ fn main() -> Result<()> {
maybe_delay_exit(delay_exit);
scenario.teardown();
deinit_and_exit(wait_pg_result);
}
@@ -419,9 +424,13 @@ fn start_postgres(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
// before we release the mutex, fetch the swap size (if any) for later.
let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
let disk_quota_bytes = state.pspec.as_ref().unwrap().spec.disk_quota_bytes;
// before we release the mutex, fetch some parameters for later.
let &ComputeSpec {
swap_size_bytes,
disk_quota_bytes,
disable_lfc_resizing,
..
} = &state.pspec.as_ref().unwrap().spec;
drop(state);
// Launch remaining service threads
@@ -526,11 +535,18 @@ fn start_postgres(
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
file_cache_connstr.cloned()
};
let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
pgconnstr: file_cache_connstr.cloned(),
pgconnstr,
addr: vm_monitor_addr.clone(),
})),
token.clone(),

View File

@@ -34,12 +34,12 @@ use nix::unistd::Pid;
use tracing::{info, info_span, warn, Instrument};
use utils::fs_ext::is_directory_empty;
#[path = "fast_import/aws_s3_sync.rs"]
mod aws_s3_sync;
#[path = "fast_import/child_stdio_to_log.rs"]
mod child_stdio_to_log;
#[path = "fast_import/s3_uri.rs"]
mod s3_uri;
#[path = "fast_import/s5cmd.rs"]
mod s5cmd;
#[derive(clap::Parser)]
struct Args {
@@ -326,7 +326,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
info!("upload pgdata");
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
@@ -334,10 +334,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
{
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("status");
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("sync status directory to destination")?;
}

View File

@@ -4,24 +4,21 @@ use camino::Utf8Path;
use super::s3_uri::S3Uri;
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("s5cmd");
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
builder.arg("--endpoint-url").arg(val);
}
let mut builder = tokio::process::Command::new("aws");
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn s5cmd")?
.context("spawn aws s3 sync")?
.wait()
.await
.context("wait for s5cmd")?;
.context("wait for aws s3 sync")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("s5cmd failed"))
Err(anyhow::anyhow!("aws s3 sync failed"))
}
}

View File

@@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");
let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});
Ok::<(), anyhow::Error>(())

View File

@@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::failpoint_support::failpoints_handler;
use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
@@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
match failpoints_handler(req, CancellationToken::new()).await {
Ok(r) => r,
Err(ApiError::BadRequest(e)) => {
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
}
Err(_) => {
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);

View File

@@ -1,13 +1,16 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::Client;
use tracing::info;
/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
@@ -15,6 +18,7 @@ impl<'m> MigrationRunner<'m> {
Self { client, migrations }
}
/// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
@@ -25,37 +29,61 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}
/// Update the neon_migration.migration_id value
///
/// This function has a fail point called compute-migration, which can be
/// used if you would like to fail the application of a series of migrations
/// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});
false
})();
if fail {
return Err(anyhow::anyhow!(format!(
"migration {} was configured to fail because of a failpoint",
migration_id
)));
}
}
self.client
.simple_query(&setval)
.query(
"UPDATE neon_migration.migration_id SET id = $1",
&[&migration_id],
)
.context("run_migrations update id")?;
Ok(())
}
fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;
/// Prepare the migrations the target database for handling migrations
fn prepare_database(&mut self) -> Result<()> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
self.client.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)?;
self.client
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
self.client
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
Ok(())
}
/// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
self.prepare_database()?;
let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
@@ -69,6 +97,11 @@ impl<'m> MigrationRunner<'m> {
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
@@ -87,7 +120,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;
// Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;
self.client

View File

@@ -0,0 +1,9 @@
DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;
END $$;

View File

@@ -0,0 +1,25 @@
DO $$
DECLARE
role record;
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
END IF;
END LOOP;
FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;

View File

@@ -0,0 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;

View File

@@ -0,0 +1,19 @@
DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'pg_monitor'::regrole;
IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;
IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;

View File

@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.

View File

@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.

View File

@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.

View File

@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.

View File

@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.

View File

@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;
END $$;

View File

@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
END IF;
END $$;

View File

@@ -19,6 +19,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -36,6 +37,8 @@ use safekeeper_api::{
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -689,6 +692,21 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}
/// A flock-based guard over the neon_local repository directory
struct RepoLock {
_file: File,
}
impl RepoLock {
fn new() -> Result<Self> {
let repo_dir = File::open(local_env::base_path())?;
let repo_dir_fd = repo_dir.as_raw_fd();
flock(repo_dir_fd, FlockArg::LockExclusive)?;
Ok(Self { _file: repo_dir })
}
}
// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
@@ -700,9 +718,14 @@ fn main() -> Result<()> {
let cli = Cli::parse();
// Check for 'neon init' command first.
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
handle_init(&args).map(|env| Some(Cow::Owned(env)))
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
} else {
// This tool uses a collection of simple files to store its state, and consequently
// it is not generally safe to run multiple commands concurrently. Rather than expect
// all callers to know this, use a lock file to protect against concurrent execution.
let _repo_lock = RepoLock::new().unwrap();
// all other commands need an existing config
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
@@ -728,11 +751,12 @@ fn main() -> Result<()> {
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
if &original_env != env {
let subcommand_result = if &original_env != env {
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
} else {
subcommand_result.map(|()| None)
}
};
(subcommand_result, Some(_repo_lock))
};
match subcommand_result {
@@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
} else {
// User (likely interactive) did not provide a description of the environment, give them the default
NeonLocalInitConf {
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
},
@@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
broker::start_broker_process(env, &retry_timeout).await
});
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
}
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
for ps_conf in &env.pageservers {
js.spawn(async move {
@@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
if env.control_plane_api.is_none() {
return Ok(());
}
let storcon = StorageController::from_env(env);
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();

View File

@@ -316,6 +316,10 @@ impl Endpoint {
// and can cause errors like 'no unpinned buffers available', see
// <https://github.com/neondatabase/neon/issues/9956>
conf.append("shared_buffers", "1MB");
// Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
// batching logic. Set this to 2 so that we exercise the code a bit without letting
// individual tests do a lot of concurrent work on underpowered test machines
conf.append("effective_io_concurrency", "2");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
@@ -581,6 +585,7 @@ impl Endpoint {
features: self.features.clone(),
swap_size_bytes: None,
disk_quota_bytes: None,
disable_lfc_resizing: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used

View File

@@ -76,7 +76,7 @@ pub struct LocalEnv {
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Option<Url>,
pub control_plane_api: Url,
// Control plane upcall API for storage controller. If set, this will be propagated into the
// storage controller's configuration.
@@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Option<Url>>,
pub control_plane_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Option<Url>>,
}
@@ -180,7 +180,7 @@ impl NeonStorageControllerConf {
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
// Very tight heartbeat interval to speed up tests
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
}
impl Default for NeonStorageControllerConf {
@@ -535,7 +535,7 @@ impl LocalEnv {
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api,
branch_name_mappings,
}
@@ -638,7 +638,7 @@ impl LocalEnv {
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
control_plane_api: self.control_plane_api.clone(),
control_plane_api: Some(self.control_plane_api.clone()),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
branch_name_mappings: self.branch_name_mappings.clone(),
},
@@ -768,7 +768,7 @@ impl LocalEnv {
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
control_plane_api: control_plane_api.unwrap_or_default(),
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
branch_name_mappings: Default::default(),
};

View File

@@ -95,21 +95,19 @@ impl PageServerNode {
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
if let Some(control_plane_api) = &self.env.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));
overrides.push(format!(
"control_plane_api='{}'",
self.env.control_plane_api.as_str()
));
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
if !conf.other.contains_key("remote_storage") {

View File

@@ -338,7 +338,7 @@ impl StorageController {
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();
let listen = format!(
"{}:{}",
@@ -708,7 +708,7 @@ impl StorageController {
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),

View File

@@ -5,7 +5,8 @@ use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -211,6 +212,8 @@ enum Command {
#[arg(long)]
timeout: humantime::Duration,
},
/// List safekeepers known to the storage controller
Safekeepers {},
}
#[derive(Parser)]
@@ -1020,6 +1023,31 @@ async fn main() -> anyhow::Result<()> {
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
);
}
Command::Safekeepers {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<SafekeeperDescribeResponse>>(
Method::GET,
"control/v1/safekeeper".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.id.cmp(&b.id));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
for sk in resp {
table.add_row([
format!("{}", sk.id),
format!("{}", sk.version),
sk.host,
format!("{}", sk.port),
format!("{}", sk.http_port),
sk.availability_zone_id.to_string(),
]);
}
println!("{table}");
}
}
Ok(())

View File

@@ -67,6 +67,15 @@ pub struct ComputeSpec {
#[serde(default)]
pub disk_quota_bytes: Option<u64>,
/// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
/// the initial size of LFC.
///
/// This is intended for use when the LFC size is being overridden from the default but
/// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
/// LFC sizing.
#[serde(default)]
pub disable_lfc_resizing: Option<bool>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,

View File

@@ -372,6 +372,23 @@ pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}
/// Publicly exposed safekeeper description
///
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperDescribeResponse {
pub id: NodeId,
pub region_id: String,
/// 1 is special, it means just created (not currently posted to storcon).
/// Zero or negative is not really expected.
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
pub version: i64,
pub host: String,
pub port: i32,
pub http_port: i32,
pub availability_zone_id: String,
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -6,6 +6,7 @@ pub mod utilization;
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;
use core::ops::Range;
use std::{
collections::HashMap,
fmt::Display,
@@ -28,6 +29,7 @@ use utils::{
};
use crate::{
key::Key,
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
@@ -210,6 +212,68 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,

View File

@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
const TIMELINE_ID: u32 = 1;
/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R) -> WalGenerator<R> {
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
Self {
record_generator,
lsn: Lsn(0),
prev_lsn: Lsn(0),
lsn: start_lsn,
prev_lsn: start_lsn,
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-protocol2"
version = "0.1.0"
edition = "2018"
edition = "2021"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -9,8 +9,7 @@
//!
//! This library assumes that the `client_encoding` backend parameter has been
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
#![doc(html_root_url = "https://docs.rs/postgres-protocol/0.6")]
#![warn(missing_docs, rust_2018_idioms, clippy::all)]
#![warn(missing_docs, clippy::all)]
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};

View File

@@ -3,7 +3,6 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut};
use std::convert::TryFrom;
use std::error::Error;
use std::io;
use std::marker;

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-types2"
version = "0.1.0"
edition = "2018"
edition = "2021"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -2,8 +2,7 @@
//!
//! This crate is used by the `tokio-postgres` and `postgres` crates. You normally don't need to depend directly on it
//! unless you want to define your own `ToSql` or `FromSql` definitions.
#![doc(html_root_url = "https://docs.rs/postgres-types/0.2")]
#![warn(clippy::all, rust_2018_idioms, missing_docs)]
#![warn(clippy::all, missing_docs)]
use fallible_iterator::FallibleIterator;
use postgres_protocol2::types;

View File

@@ -1,7 +1,7 @@
[package]
name = "tokio-postgres2"
version = "0.1.0"
edition = "2018"
edition = "2021"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -33,10 +33,14 @@ pub struct Response {
#[derive(PartialEq, Debug)]
enum State {
Active,
Terminating,
Closing,
}
enum WriteReady {
Terminating,
WaitingOnRead,
}
/// A connection to a PostgreSQL database.
///
/// This is one half of what is returned when a new connection is established. It performs the actual IO with the
@@ -51,7 +55,6 @@ pub struct Connection<S, T> {
/// HACK: we need this in the Neon Proxy to forward params.
pub parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
pending_request: Option<RequestMessages>,
pending_responses: VecDeque<BackendMessage>,
responses: VecDeque<Response>,
state: State,
@@ -72,7 +75,6 @@ where
stream,
parameters,
receiver,
pending_request: None,
pending_responses,
responses: VecDeque::new(),
state: State::Active,
@@ -93,26 +95,23 @@ where
.map(|o| o.map(|r| r.map_err(Error::io)))
}
fn poll_read(&mut self, cx: &mut Context<'_>) -> Result<Option<AsyncMessage>, Error> {
if self.state != State::Active {
trace!("poll_read: done");
return Ok(None);
}
/// Read and process messages from the connection to postgres.
/// client <- postgres
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<AsyncMessage, Error>> {
loop {
let message = match self.poll_response(cx)? {
Poll::Ready(Some(message)) => message,
Poll::Ready(None) => return Err(Error::closed()),
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
Poll::Pending => {
trace!("poll_read: waiting on response");
return Ok(None);
return Poll::Pending;
}
};
let (mut messages, request_complete) = match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
return Ok(Some(AsyncMessage::Notice(error)));
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
}
BackendMessage::Async(Message::NotificationResponse(body)) => {
let notification = Notification {
@@ -120,7 +119,7 @@ where
channel: body.channel().map_err(Error::parse)?.to_string(),
payload: body.message().map_err(Error::parse)?.to_string(),
};
return Ok(Some(AsyncMessage::Notification(notification)));
return Poll::Ready(Ok(AsyncMessage::Notification(notification)));
}
BackendMessage::Async(Message::ParameterStatus(body)) => {
self.parameters.insert(
@@ -139,8 +138,10 @@ where
let mut response = match self.responses.pop_front() {
Some(response) => response,
None => match messages.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(error)) => return Err(Error::db(error)),
_ => return Err(Error::unexpected_message()),
Some(Message::ErrorResponse(error)) => {
return Poll::Ready(Err(Error::db(error)))
}
_ => return Poll::Ready(Err(Error::unexpected_message())),
},
};
@@ -164,18 +165,14 @@ where
request_complete,
});
trace!("poll_read: waiting on sender");
return Ok(None);
return Poll::Pending;
}
}
}
}
/// Fetch the next client request and enqueue the response sender.
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<RequestMessages>> {
if let Some(messages) = self.pending_request.take() {
trace!("retrying pending request");
return Poll::Ready(Some(messages));
}
if self.receiver.is_closed() {
return Poll::Ready(None);
}
@@ -193,74 +190,80 @@ where
}
}
fn poll_write(&mut self, cx: &mut Context<'_>) -> Result<bool, Error> {
/// Process client requests and write them to the postgres connection, flushing if necessary.
/// client -> postgres
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<WriteReady, Error>> {
loop {
if self.state == State::Closing {
trace!("poll_write: done");
return Ok(false);
}
if Pin::new(&mut self.stream)
.poll_ready(cx)
.map_err(Error::io)?
.is_pending()
{
trace!("poll_write: waiting on socket");
return Ok(false);
// poll_ready is self-flushing.
return Poll::Pending;
}
let request = match self.poll_request(cx) {
Poll::Ready(Some(request)) => request,
Poll::Ready(None) if self.responses.is_empty() && self.state == State::Active => {
match self.poll_request(cx) {
// send the message to postgres
Poll::Ready(Some(RequestMessages::Single(request))) => {
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
}
// No more messages from the client, and no more responses to wait for.
// Send a terminate message to postgres
Poll::Ready(None) if self.responses.is_empty() => {
trace!("poll_write: at eof, terminating");
self.state = State::Terminating;
let mut request = BytesMut::new();
frontend::terminate(&mut request);
RequestMessages::Single(FrontendMessage::Raw(request.freeze()))
let request = FrontendMessage::Raw(request.freeze());
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
trace!("poll_write: sent eof, closing");
trace!("poll_write: done");
return Poll::Ready(Ok(WriteReady::Terminating));
}
// No more messages from the client, but there are still some responses to wait for.
Poll::Ready(None) => {
trace!(
"poll_write: at eof, pending responses {}",
self.responses.len()
);
return Ok(true);
ready!(self.poll_flush(cx))?;
return Poll::Ready(Ok(WriteReady::WaitingOnRead));
}
// Still waiting for a message from the client.
Poll::Pending => {
trace!("poll_write: waiting on request");
return Ok(true);
}
};
match request {
RequestMessages::Single(request) => {
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
if self.state == State::Terminating {
trace!("poll_write: sent eof, closing");
self.state = State::Closing;
}
ready!(self.poll_flush(cx))?;
return Poll::Pending;
}
}
}
}
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<(), Error> {
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
match Pin::new(&mut self.stream)
.poll_flush(cx)
.map_err(Error::io)?
{
Poll::Ready(()) => trace!("poll_flush: flushed"),
Poll::Pending => trace!("poll_flush: waiting on socket"),
Poll::Ready(()) => {
trace!("poll_flush: flushed");
Poll::Ready(Ok(()))
}
Poll::Pending => {
trace!("poll_flush: waiting on socket");
Poll::Pending
}
}
Ok(())
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
if self.state != State::Closing {
return Poll::Pending;
}
match Pin::new(&mut self.stream)
.poll_close(cx)
.map_err(Error::io)?
@@ -289,18 +292,30 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<AsyncMessage, Error>>> {
let message = self.poll_read(cx)?;
let want_flush = self.poll_write(cx)?;
if want_flush {
self.poll_flush(cx)?;
if self.state != State::Closing {
// if the state is still active, try read from and write to postgres.
let message = self.poll_read(cx)?;
let closing = self.poll_write(cx)?;
if let Poll::Ready(WriteReady::Terminating) = closing {
self.state = State::Closing;
}
if let Poll::Ready(message) = message {
return Poll::Ready(Some(Ok(message)));
}
// poll_read returned Pending.
// poll_write returned Pending or Ready(WriteReady::WaitingOnRead).
// if poll_write returned Ready(WriteReady::WaitingOnRead), then we are waiting to read more data from postgres.
if self.state != State::Closing {
return Poll::Pending;
}
}
match message {
Some(message) => Poll::Ready(Some(Ok(message))),
None => match self.poll_shutdown(cx) {
Poll::Ready(Ok(())) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
},
match self.poll_shutdown(cx) {
Poll::Ready(Ok(())) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -1,5 +1,5 @@
//! An asynchronous, pipelined, PostgreSQL client.
#![warn(rust_2018_idioms, clippy::all)]
#![warn(clippy::all)]
pub use crate::cancel_token::CancelToken;
pub use crate::client::{Client, SocketConfig};

View File

@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// Declare a failpoint that can use the `pause` failpoint action.
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
@@ -181,7 +181,7 @@ pub async fn failpoints_handler(
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because storage was compiled without failpoints support"
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}

View File

@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
@@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
.await
}
// Get compact info of a timeline
async fn timeline_compact_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
@@ -3400,6 +3428,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_info_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),

View File

@@ -3122,6 +3122,23 @@ impl Tenant {
}
}
pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
}
/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
@@ -5759,13 +5776,13 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
#[cfg(feature = "testing")]
use timeline::CompactLsnRange;
#[cfg(feature = "testing")]
use timeline::GcInfo;
static TEST_KEY: Lazy<Key> =
@@ -9634,7 +9651,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use timeline::CompactLsnRange;
use models::CompactLsnRange;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;

View File

@@ -1,12 +1,15 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use pageserver_compaction::helpers::overlaps_with;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
///
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
/// The function implements a fast path check and a slow path check.
///
/// The fast path checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
@@ -25,31 +28,47 @@ use super::storage_layer::LayerName;
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
///
/// However, if a partial compaction is still going on, it is possible that we get a layer map not satisfying the above condition.
/// Therefore, we fallback to simply check if any of the two delta layers overlap. (See "A slow path...")
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
all_delta_layers.push(layer.clone());
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
if layer.key_range.start.next() != layer.key_range.end {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
}
for layer in &all_delta_layers {
for (idx, layer) in all_delta_layers.iter().enumerate() {
if layer.key_range.start.next() == layer.key_range.end {
continue;
}
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
// A slow path to check if the layer intersects with any other delta layer.
for (other_idx, other_layer) in all_delta_layers.iter().enumerate() {
if other_idx == idx {
// do not check self intersects with self
continue;
}
if overlaps_with(&layer.lsn_range, &other_layer.lsn_range)
&& overlaps_with(&layer.key_range, &other_layer.key_range)
{
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
layer, other_layer
);
return Some(err);
}
}
}
}
None

View File

@@ -31,9 +31,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
pub sub_compaction_max_job_size_mb: Option<u64>,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
#[cfg(test)]
#[cfg(feature = "testing")]
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,

View File

@@ -29,6 +29,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -1823,7 +1824,7 @@ impl Timeline {
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
bail!("failed to acquire partition lock during gc-compaction");
};
partition.clone()
};
@@ -2156,15 +2157,14 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
}
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
@@ -2546,13 +2546,48 @@ impl Timeline {
);
// Step 3: Place back to the layer map.
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
let mut final_layers = all_layers
.iter()
.map(|layer| layer.layer_name())
.collect::<HashSet<_>>();
for layer in &layer_selection {
final_layers.remove(&layer.layer_desc().layer_name());
}
for layer in &compact_to {
final_layers.insert(layer.layer_desc().layer_name());
}
let final_layers = final_layers.into_iter().collect_vec();
// TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
if let Some(err) = check_valid_layermap(&final_layers) {
bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
}
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
// operate on L1 layers.
{
// TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
let mut guard = self.layers.write().await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
// Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
// Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
// be batched into `schedule_compaction_update`.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)?;
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;

View File

@@ -541,6 +541,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
else
{
LWLockRelease(lfc_lock);
return found;
}

View File

@@ -827,7 +827,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
{
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
shard->n_reconnect_attempts += 1;
}
shard->n_reconnect_attempts = 0;

View File

@@ -102,23 +102,39 @@ User can pass several optional headers that will affect resulting json.
2. `Neon-Array-Mode: true`. Return postgres rows as arrays instead of objects. That is more compact representation and also helps in some edge
cases where it is hard to use rows represented as objects (e.g. when several fields have the same name).
## Test proxy locally
## Using SNI-based routing on localhost
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
```
start proxy
Then we need to build proxy with 'testing' feature and run, e.g.:
```sh
./target/debug/proxy -c server.crt -k server.key
RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://proxy:password@endpoint.localtest.me:5432/postgres' --is-private-access-proxy true -c server.crt -k server.key
```
and connect to it
We will also need to have a postgres instance. Assuming that we have setted up docker we can set it up as follows:
```sh
docker run \
--detach \
--name proxy-postgres \
--env POSTGRES_PASSWORD=proxy-postgres \
--publish 5432:5432 \
postgres:17-bookworm
```
Next step is setting up auth table and schema as well as creating role (without the JWT table):
```sh
docker exec -it proxy-postgres psql -U postgres -c "CREATE SCHEMA IF NOT EXISTS neon_control_plane"
docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';"
```
Now from client you can start a new session:
```sh
PGSSLROOTCERT=./server.crt psql 'postgres://my-cluster-42.localtest.me:1234?sslmode=verify-full'
```
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
```

View File

@@ -678,6 +678,9 @@ mod tests {
.await
.unwrap();
// flush the final server message
stream.flush().await.unwrap();
handle.await.unwrap();
}

View File

@@ -10,7 +10,6 @@ use tracing::info;
use super::backend::ComputeCredentialKeys;
use super::{AuthError, PasswordHackPayload};
use crate::config::TlsServerEndPoint;
use crate::context::RequestContext;
use crate::control_plane::AuthSecret;
use crate::intern::EndpointIdInt;
@@ -18,6 +17,7 @@ use crate::sasl;
use crate::scram::threadpool::ThreadPool;
use crate::scram::{self};
use crate::stream::{PqStream, Stream};
use crate::tls::TlsServerEndPoint;
/// Every authentication selector is supposed to implement this trait.
pub(crate) trait AuthMethod {

View File

@@ -13,7 +13,10 @@ use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use proxy::auth::{self};
use proxy::cancellation::CancellationHandlerMain;
use proxy::config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig};
use proxy::config::{
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
};
use proxy::conn::TokioTcpAcceptor;
use proxy::control_plane::locks::ApiLocks;
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use proxy::http::health_server::AppMetrics;
@@ -25,6 +28,7 @@ use proxy::rate_limiter::{
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::{self, GlobalConnPoolOptions};
use proxy::tls::client_config::compute_client_config_with_root_certs;
use proxy::types::RoleName;
use proxy::url::ApiUrl;
@@ -33,7 +37,6 @@ project_build_tag!(BUILD_TAG);
use clap::Parser;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -163,8 +166,8 @@ async fn main() -> anyhow::Result<()> {
}
};
let metrics_listener = TcpListener::bind(args.metrics).await?.into_std()?;
let http_listener = TcpListener::bind(args.http).await?;
let metrics_listener = TokioTcpAcceptor::bind(args.metrics).await?;
let http_listener = TokioTcpAcceptor::bind(args.http).await?;
let shutdown = CancellationToken::new();
// todo: should scale with CU
@@ -209,6 +212,7 @@ async fn main() -> anyhow::Result<()> {
http_listener,
shutdown.clone(),
Arc::new(CancellationHandlerMain::new(
&config.connect_to_compute,
Arc::new(DashMap::new()),
None,
proxy::metrics::CancellationSource::Local,
@@ -268,10 +272,15 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
};
let compute_config = ComputeConfig {
retry: RetryConfig::parse(RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)?,
tls: Arc::new(compute_client_config_with_root_certs()?),
timeout: Duration::from_secs(2),
};
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: None,
metric_collection: None,
allow_self_signed_compute: false,
http_config,
authentication_config: AuthenticationConfig {
jwks_cache: JwkCache::default(),
@@ -290,9 +299,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
region: "local".into(),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
connect_compute_locks,
connect_to_compute_retry_config: RetryConfig::parse(
RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES,
)?,
connect_to_compute: compute_config,
})))
}

View File

@@ -10,12 +10,13 @@ use clap::Arg;
use futures::future::Either;
use futures::TryFutureExt;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::conn::{Acceptor, TokioTcpAcceptor};
use proxy::context::RequestContext;
use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::protocol2::ConnectionInfo;
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use proxy::stream::{PqStream, Stream};
use proxy::tls::TlsServerEndPoint;
use rustls::crypto::ring;
use rustls::pki_types::PrivateKeyDer;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -122,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
// Start listening for incoming client connections
let proxy_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
info!("Starting sni router on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let proxy_listener = TokioTcpAcceptor::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
@@ -152,17 +153,13 @@ async fn task_main(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
listener: tokio::net::TcpListener,
acceptor: TokioTcpAcceptor,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
run_until_cancelled(acceptor.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
@@ -172,10 +169,6 @@ async fn task_main(
connections.spawn(
async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
info!(%peer_addr, "serving");
let ctx = RequestContext::new(
session_id,
@@ -197,7 +190,7 @@ async fn task_main(
}
connections.close();
drop(listener);
drop(acceptor);
connections.wait().await;

View File

@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use futures::future::Either;
@@ -8,9 +9,10 @@ use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
use proxy::cancellation::{CancelMap, CancellationHandler};
use proxy::config::{
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, HttpConfig,
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig,
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
};
use proxy::conn::TokioTcpAcceptor;
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http::health_server::AppMetrics;
use proxy::metrics::Metrics;
@@ -23,9 +25,9 @@ use proxy::redis::{elasticache, notifications};
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::tls::client_config::compute_client_config_with_root_certs;
use proxy::{auth, control_plane, http, serverless, usage_metrics};
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -129,9 +131,6 @@ struct ProxyCliArgs {
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
connect_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
@@ -354,17 +353,17 @@ async fn main() -> anyhow::Result<()> {
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.http.parse()?;
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
let http_listener = TokioTcpAcceptor::bind(http_address).await?;
let mgmt_address: SocketAddr = args.mgmt.parse()?;
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?;
let mgmt_listener = TokioTcpAcceptor::bind(mgmt_address).await?;
let proxy_listener = if !args.is_auth_broker {
let proxy_address: SocketAddr = args.proxy.parse()?;
info!("Starting proxy on {proxy_address}");
Some(TcpListener::bind(proxy_address).await?)
Some(TokioTcpAcceptor::bind(proxy_address).await?)
} else {
None
};
@@ -374,7 +373,7 @@ async fn main() -> anyhow::Result<()> {
let serverless_listener = if let Some(serverless_address) = args.wss {
let serverless_address: SocketAddr = serverless_address.parse()?;
info!("Starting wss on {serverless_address}");
Some(TcpListener::bind(serverless_address).await?)
Some(TokioTcpAcceptor::bind(serverless_address).await?)
} else if args.is_auth_broker {
bail!("wss arg must be present for auth-broker")
} else {
@@ -400,6 +399,7 @@ async fn main() -> anyhow::Result<()> {
let cancellation_handler = Arc::new(CancellationHandler::<
Option<Arc<Mutex<RedisPublisherClient>>>,
>::new(
&config.connect_to_compute,
cancel_map.clone(),
redis_publisher,
proxy::metrics::CancellationSource::FromClient,
@@ -495,6 +495,7 @@ async fn main() -> anyhow::Result<()> {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
config,
client,
cache.clone(),
cancel_map.clone(),
@@ -503,6 +504,7 @@ async fn main() -> anyhow::Result<()> {
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
config,
client,
cache.clone(),
cancel_map.clone(),
@@ -564,9 +566,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
if args.allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
@@ -638,10 +637,15 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
console_redirect_confirmation_timeout: args.webauth_confirmation_timeout,
};
let compute_config = ComputeConfig {
retry: config::RetryConfig::parse(&args.connect_to_compute_retry)?,
tls: Arc::new(compute_client_config_with_root_certs()?),
timeout: Duration::from_secs(2),
};
let config = ProxyConfig {
tls_config,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,
@@ -649,9 +653,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute_retry_config: config::RetryConfig::parse(
&args.connect_to_compute_retry,
)?,
connect_to_compute: compute_config,
};
let config = Box::leak(Box::new(config));

View File

@@ -3,10 +3,9 @@ use std::sync::Arc;
use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use once_cell::sync::OnceCell;
use postgres_client::{tls::MakeTlsConnect, CancelToken};
use postgres_client::tls::MakeTlsConnect;
use postgres_client::CancelToken;
use pq_proto::CancelKeyData;
use rustls::crypto::ring;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
@@ -14,6 +13,7 @@ use tracing::{debug, info};
use uuid::Uuid;
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::config::ComputeConfig;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
@@ -21,9 +21,7 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};
use crate::compute::{load_certs, AcceptEverythingVerifier};
use crate::postgres_rustls::MakeRustlsConnect;
use crate::tls::postgres_rustls::MakeRustlsConnect;
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
@@ -35,6 +33,7 @@ type IpSubnetKey = IpNet;
///
/// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
pub struct CancellationHandler<P> {
compute_config: &'static ComputeConfig,
map: CancelMap,
client: P,
/// This field used for the monitoring purposes.
@@ -183,7 +182,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
"cancelling query per user's request using key {key}, hostname {}, address: {}",
cancel_closure.hostname, cancel_closure.socket_addr
);
cancel_closure.try_cancel_query().await
cancel_closure.try_cancel_query(self.compute_config).await
}
#[cfg(test)]
@@ -198,8 +197,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
}
impl CancellationHandler<()> {
pub fn new(map: CancelMap, from: CancellationSource) -> Self {
pub fn new(
compute_config: &'static ComputeConfig,
map: CancelMap,
from: CancellationSource,
) -> Self {
Self {
compute_config,
map,
client: (),
from,
@@ -214,8 +218,14 @@ impl CancellationHandler<()> {
}
impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
pub fn new(
compute_config: &'static ComputeConfig,
map: CancelMap,
client: Option<Arc<Mutex<P>>>,
from: CancellationSource,
) -> Self {
Self {
compute_config,
map,
client,
from,
@@ -229,8 +239,6 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
}
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
/// This should've been a [`std::future::Future`], but
/// it's impossible to name a type of an unboxed future
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
@@ -240,7 +248,6 @@ pub struct CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String, // for pg_sni router
allow_self_signed_compute: bool,
}
impl CancelClosure {
@@ -249,47 +256,23 @@ impl CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String,
allow_self_signed_compute: bool,
) -> Self {
Self {
socket_addr,
cancel_token,
ip_allowlist,
hostname,
allow_self_signed_compute,
}
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
pub(crate) async fn try_cancel_query(
self,
compute_config: &ComputeConfig,
) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
let client_config = if self.allow_self_signed_compute {
// Allow all certificates for creating the connection. Used only for tests
let verifier = Arc::new(AcceptEverythingVerifier);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(|_e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"TLS root store initialization failed".to_string(),
))
})?
.clone();
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let mut mk_tls =
crate::tls::postgres_rustls::MakeRustlsConnect::new(compute_config.tls.clone());
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
&self.hostname,
@@ -341,11 +324,30 @@ impl<P> Drop for Session<P> {
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::time::Duration;
use super::*;
use crate::config::RetryConfig;
use crate::tls::client_config::compute_client_config_with_certs;
fn config() -> ComputeConfig {
let retry = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
ComputeConfig {
retry,
tls: Arc::new(compute_client_config_with_certs(std::iter::empty())),
timeout: Duration::from_secs(2),
}
}
#[tokio::test]
async fn check_session_drop() -> anyhow::Result<()> {
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
Box::leak(Box::new(config())),
CancelMap::default(),
CancellationSource::FromRedis,
));
@@ -361,8 +363,11 @@ mod tests {
#[tokio::test]
async fn cancel_session_noop_regression() {
let handler =
CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local);
let handler = CancellationHandler::<()>::new(
Box::leak(Box::new(config())),
CancelMap::default(),
CancellationSource::Local,
);
handler
.cancel_session(
CancelKeyData {

View File

@@ -1,17 +1,13 @@
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use pq_proto::StartupMessageParams;
use rustls::client::danger::ServerCertVerifier;
use rustls::crypto::ring;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::net::TcpStream;
@@ -19,14 +15,15 @@ use tracing::{debug, error, info, warn};
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::postgres_rustls::MakeRustlsConnect;
use crate::proxy::neon_option;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::types::Host;
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -41,9 +38,6 @@ pub(crate) enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("Couldn't load native TLS certificates: {0:?}")]
TlsCertificateError(Vec<rustls_native_certs::Error>),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] InvalidDnsNameError),
@@ -90,7 +84,6 @@ impl ReportableError for ConnectionError {
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsCertificateError(_) => crate::error::ErrorKind::Service,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
@@ -200,11 +193,15 @@ impl ConnCfg {
let connect_once = |host, port| {
debug!("trying to connect to compute node at {host}:{port}");
connect_with_timeout(host, port).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
connect_with_timeout(host, port).and_then(|stream| async {
let socket_addr = stream.peer_addr()?;
let socket = socket2::SockRef::from(&stream);
// Disable Nagle's algorithm to not introduce latency between
// client and compute.
socket.set_nodelay(true)?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
Ok((socket_addr, socket))
socket.set_keepalive(true)?;
Ok((socket_addr, stream))
})
};
@@ -251,35 +248,14 @@ impl ConnCfg {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
allow_self_signed_compute: bool,
aux: MetricsAuxInfo,
timeout: Duration,
config: &ComputeConfig,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
drop(pause);
let client_config = if allow_self_signed_compute {
// Allow all certificates for creating the connection
let verifier = Arc::new(AcceptEverythingVerifier);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(ConnectionError::TlsCertificateError)?
.clone();
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let mut mk_tls = crate::tls::postgres_rustls::MakeRustlsConnect::new(config.tls.clone());
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
host,
@@ -320,7 +296,6 @@ impl ConnCfg {
},
vec![],
host.to_string(),
allow_self_signed_compute,
);
let connection = PostgresConnection {
@@ -352,63 +327,6 @@ fn filtered_options(options: &str) -> Option<String> {
Some(options)
}
pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
let der_certs = rustls_native_certs::load_native_certs();
if !der_certs.errors.is_empty() {
return Err(der_certs.errors);
}
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs.certs);
Ok(Arc::new(store))
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
#[derive(Debug)]
pub(crate) struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
use rustls::SignatureScheme;
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
vec![
SignatureScheme::ECDSA_NISTP521_SHA512,
SignatureScheme::ECDSA_NISTP384_SHA384,
SignatureScheme::ECDSA_NISTP256_SHA256,
SignatureScheme::RSA_PSS_SHA512,
SignatureScheme::RSA_PSS_SHA384,
SignatureScheme::RSA_PSS_SHA256,
SignatureScheme::ED25519,
]
}
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,17 +1,10 @@
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, ensure, Context, Ok};
use clap::ValueEnum;
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use sha2::{Digest, Sha256};
use tracing::{error, info};
use x509_parser::oid_registry;
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::AuthRateLimiter;
@@ -20,12 +13,12 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
use crate::scram::threadpool::ThreadPool;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::GlobalConnPoolOptions;
pub use crate::tls::server_config::{configure_tls, TlsConfig};
use crate::types::Host;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub proxy_protocol_v2: ProxyProtocolV2,
@@ -33,7 +26,13 @@ pub struct ProxyConfig {
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
pub connect_compute_locks: ApiLocks<Host>,
pub connect_to_compute_retry_config: RetryConfig,
pub connect_to_compute: ComputeConfig,
}
pub struct ComputeConfig {
pub retry: RetryConfig,
pub tls: Arc<rustls::ClientConfig>,
pub timeout: Duration,
}
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
@@ -53,12 +52,6 @@ pub struct MetricCollectionConfig {
pub backup_metric_collection_config: MetricBackupCollectionConfig,
}
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
pub common_names: HashSet<String>,
pub cert_resolver: Arc<CertResolver>,
}
pub struct HttpConfig {
pub accept_websockets: bool,
pub pool_options: GlobalConnPoolOptions,
@@ -81,272 +74,6 @@ pub struct AuthenticationConfig {
pub console_redirect_confirmation_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()
}
}
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L159>
pub const PG_ALPN_PROTOCOL: &[u8] = b"postgresql";
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert_path(key_path, cert_path, true)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
for entry in std::fs::read_dir(certs_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// file names aligned with default cert-manager names
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver.add_cert_path(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
}
}
}
}
let common_names = cert_resolver.get_common_names();
let cert_resolver = Arc::new(cert_resolver);
// allow TLS 1.2 to be compatible with older client libraries
let mut config =
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
.context("ring should support TLS1.2 and TLS1.3")?
.with_no_client_auth()
.with_cert_resolver(cert_resolver.clone());
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
if allow_tls_keylogfile {
// KeyLogFile will check for the SSLKEYLOGFILE environment variable.
config.key_log = Arc::new(rustls::KeyLogFile::new());
}
Ok(TlsConfig {
config: Arc::new(config),
common_names,
cert_resolver,
})
}
/// Channel binding parameter
///
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
/// Description: The hash of the TLS server's certificate as it
/// appears, octet for octet, in the server's Certificate message. Note
/// that the Certificate message contains a certificate_list, in which
/// the first element is the server's certificate.
///
/// The hash function is to be selected as follows:
///
/// * if the certificate's signatureAlgorithm uses a single hash
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
///
/// * if the certificate's signatureAlgorithm uses a single hash
/// function and that hash function neither MD5 nor SHA-1, then use
/// the hash function associated with the certificate's
/// signatureAlgorithm;
///
/// * if the certificate's signatureAlgorithm uses no hash functions or
/// uses multiple hash functions, then this channel binding type's
/// channel bindings are undefined at this time (updates to is channel
/// binding type may occur to address this issue if it ever arises).
#[derive(Debug, Clone, Copy)]
pub enum TlsServerEndPoint {
Sha256([u8; 32]),
Undefined,
}
impl TlsServerEndPoint {
pub fn new(cert: &CertificateDer<'_>) -> anyhow::Result<Self> {
let sha256_oids = [
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
oid_registry::OID_PKCS1_SHA256WITHRSA,
];
let pem = x509_parser::parse_x509_certificate(cert)
.context("Failed to parse PEM object from cerficiate")?
.1;
info!(subject = %pem.subject, "parsing TLS certificate");
let reg = oid_registry::OidRegistry::default().with_all_crypto();
let oid = pem.signature_algorithm.oid();
let alg = reg.get(oid);
if sha256_oids.contains(oid) {
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
Ok(Self::Sha256(tls_server_end_point))
} else {
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
Ok(Self::Undefined)
}
}
pub fn supported(&self) -> bool {
!matches!(self, TlsServerEndPoint::Undefined)
}
}
#[derive(Default, Debug)]
pub struct CertResolver {
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
}
impl CertResolver {
pub fn new() -> Self {
Self::default()
}
fn add_cert_path(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
self.add_cert(priv_key, cert_chain, is_default)
}
pub fn add_cert(
&mut self,
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
is_default: bool,
) -> anyhow::Result<()> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let pem = x509_parser::parse_x509_certificate(first_cert)
.context("Failed to parse PEM object from cerficiate")?
.1;
let common_name = pem.subject().to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some((cert.clone(), tls_server_end_point));
}
self.certs.insert(common_name, (cert, tls_server_end_point));
Ok(())
}
pub fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
}
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
self.resolve(client_hello.server_name()).map(|x| x.0)
}
}
impl CertResolver {
pub fn resolve(
&self,
server_name: Option<&str>,
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
//
// With the current coding foo.com will match *.foo.com and that
// repeats behavior of the old code.
if let Some(mut sni_name) = server_name {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
}
}
} else {
// No SNI, use the default certificate, otherwise we can't get to
// options parameter which can be used to set endpoint name too.
// That means that non-SNI flow will not work for CNAME domains in
// verify-full mode.
//
// If that will be a problem we can:
//
// a) Instead of multi-cert approach use single cert with extra
// domains listed in Subject Alternative Name (SAN).
// b) Deploy separate proxy instances for extra domains.
self.default.clone()
}
}
}
#[derive(Debug)]
pub struct EndpointCacheConfig {
/// Batch size to receive all endpoints on the startup.

221
proxy/src/conn.rs Normal file
View File

@@ -0,0 +1,221 @@
use std::future::{poll_fn, Future};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
pub trait Acceptor {
type Connection: AsyncRead + AsyncWrite + Send + Unpin + 'static;
type Error: std::error::Error + Send + Sync + 'static;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let _ = cx;
Poll::Ready(Ok(()))
}
fn accept(
&self,
) -> impl Future<Output = Result<(Self::Connection, SocketAddr), Self::Error>> + Send;
}
pub trait Connector {
type Connection: AsyncRead + AsyncWrite + Send + Unpin + 'static;
type Error: std::error::Error + Send + Sync + 'static;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let _ = cx;
Poll::Ready(Ok(()))
}
fn connect(
&self,
addr: SocketAddr,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
}
pub struct TokioTcpAcceptor {
listener: TcpListener,
tcp_nodelay: Option<bool>,
tcp_keepalive: Option<bool>,
}
impl TokioTcpAcceptor {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
Ok(Self {
listener,
tcp_nodelay: Some(true),
tcp_keepalive: None,
})
}
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
self.listener.into_std()
}
}
impl Acceptor for TokioTcpAcceptor {
type Connection = TcpStream;
type Error = io::Error;
fn accept(&self) -> impl Future<Output = Result<(Self::Connection, SocketAddr), Self::Error>> {
async move {
let (stream, addr) = self.listener.accept().await?;
let socket = socket2::SockRef::from(&stream);
if let Some(nodelay) = self.tcp_nodelay {
socket.set_nodelay(nodelay)?;
}
if let Some(keepalive) = self.tcp_keepalive {
socket.set_keepalive(keepalive)?;
}
Ok((stream, addr))
}
}
}
pub struct TokioTcpConnector;
impl Connector for TokioTcpConnector {
type Connection = TcpStream;
type Error = io::Error;
fn connect(
&self,
addr: SocketAddr,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> {
async move {
let socket = TcpStream::connect(addr).await?;
socket.set_nodelay(true)?;
Ok(socket)
}
}
}
pub trait Stream: AsyncRead + AsyncWrite + Send + Unpin + 'static {}
impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> Stream for T {}
pub trait AsyncRead {
fn readable(&self) -> impl Future<Output = io::Result<()>> + Send
where
Self: Send + Sync,
{
poll_fn(move |cx| self.poll_read_ready(cx))
}
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>>;
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [io::IoSliceMut<'_>],
) -> Poll<io::Result<usize>>;
}
pub trait AsyncWrite {
fn writable(&self) -> impl Future<Output = io::Result<()>> + Send
where
Self: Send + Sync,
{
poll_fn(move |cx| self.poll_write_ready(cx))
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>>;
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
impl AsyncRead for tokio::net::TcpStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
tokio::net::TcpStream::poll_read_ready(self, cx)
}
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match tokio::net::TcpStream::try_read(Pin::new(&mut *self).get_mut(), buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [io::IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match tokio::net::TcpStream::try_read_vectored(Pin::new(&mut *self).get_mut(), bufs) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl AsyncWrite for tokio::net::TcpStream {
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
tokio::net::TcpStream::poll_write_ready(self, cx)
}
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
<Self as tokio::io::AsyncWrite>::poll_write(self, cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
<Self as tokio::io::AsyncWrite>::poll_write_vectored(self, cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
<Self as tokio::io::AsyncWrite>::poll_flush(self, cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
<Self as tokio::io::AsyncWrite>::poll_shutdown(self, cx)
}
}

View File

@@ -8,6 +8,7 @@ use tracing::{debug, error, info, Instrument};
use crate::auth::backend::ConsoleRedirectBackend;
use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::conn::{Acceptor, TokioTcpAcceptor};
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
@@ -22,7 +23,7 @@ use crate::proxy::{
pub async fn task_main(
config: &'static ProxyConfig,
backend: &'static ConsoleRedirectBackend,
listener: tokio::net::TcpListener,
acceptor: TokioTcpAcceptor,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
@@ -30,15 +31,11 @@ pub async fn task_main(
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
run_until_cancelled(acceptor.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
@@ -115,7 +112,7 @@ pub async fn task_main(
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
@@ -131,7 +128,7 @@ pub async fn task_main(
connections.close();
cancellations.close();
drop(listener);
drop(acceptor);
// Drain connections
connections.wait().await;
@@ -213,11 +210,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
params_compat: true,
params: &params,
locks: &config.connect_compute_locks,
allow_self_signed_compute: config.allow_self_signed_compute,
},
&user_info,
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.await?;

View File

@@ -4,10 +4,11 @@ use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, Instrument};
use crate::conn::{Acceptor, TokioTcpAcceptor};
use crate::control_plane::messages::{DatabaseInfo, KickSession};
use crate::waiters::{self, Waiter, Waiters};
@@ -26,19 +27,15 @@ pub(crate) fn notify(psql_session_id: &str, msg: ComputeReady) -> Result<(), wai
/// Management API listener task.
/// It spawns management response handlers needed for the console redirect auth flow.
pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
pub async fn task_main(acceptor: TokioTcpAcceptor) -> anyhow::Result<Infallible> {
scopeguard::defer! {
info!("mgmt has shut down");
}
loop {
let (socket, peer_addr) = listener.accept().await?;
let (socket, peer_addr) = acceptor.accept().await?;
info!("accepted connection from {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set client socket option")?;
let span = info_span!("mgmt", peer = %peer_addr);
tokio::task::spawn(

View File

@@ -10,13 +10,13 @@ pub mod client;
pub(crate) mod errors;
use std::sync::Arc;
use std::time::Duration;
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::IpPattern;
use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::cache::{Cached, TimedLru};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::ProjectIdInt;
@@ -73,12 +73,9 @@ impl NodeInfo {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
allow_self_signed_compute: bool,
timeout: Duration,
config: &ComputeConfig,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config
.connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout)
.await
self.config.connect(ctx, self.aux.clone(), config).await
}
pub(crate) fn reuse_settings(&mut self, other: Self) {

View File

@@ -1,5 +1,4 @@
use std::convert::Infallible;
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, bail};
@@ -14,6 +13,7 @@ use utils::http::error::ApiError;
use utils::http::json::json_response;
use utils::http::{RouterBuilder, RouterService};
use crate::conn::TokioTcpAcceptor;
use crate::ext::{LockExt, TaskExt};
use crate::jemalloc;
@@ -36,7 +36,7 @@ fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
}
pub async fn task_main(
http_listener: TcpListener,
http_acceptor: TokioTcpAcceptor,
metrics: AppMetrics,
) -> anyhow::Result<Infallible> {
scopeguard::defer! {
@@ -45,7 +45,7 @@ pub async fn task_main(
let service = || RouterService::new(make_router(metrics).build()?);
hyper0::Server::from_tcp(http_listener)?
hyper0::Server::from_tcp(http_acceptor.into_std()?)?
.serve(service().map_err(|e| anyhow!(e))?)
.await?;

View File

@@ -78,6 +78,7 @@ pub mod cancellation;
pub mod compute;
pub mod compute_ctl;
pub mod config;
pub mod conn;
pub mod console_redirect_proxy;
pub mod context;
pub mod control_plane;
@@ -89,7 +90,6 @@ pub mod jemalloc;
pub mod logging;
pub mod metrics;
pub mod parse;
pub mod postgres_rustls;
pub mod protocol2;
pub mod proxy;
pub mod rate_limiter;
@@ -99,6 +99,7 @@ pub mod scram;
pub mod serverless;
pub mod signals;
pub mod stream;
pub mod tls;
pub mod types;
pub mod url;
pub mod usage_metrics;

View File

@@ -6,7 +6,7 @@ use tracing::{debug, info, warn};
use super::retry::ShouldRetryWakeCompute;
use crate::auth::backend::ComputeCredentialKeys;
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
use crate::config::RetryConfig;
use crate::config::{ComputeConfig, RetryConfig};
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks;
@@ -19,8 +19,6 @@ use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
use crate::proxy::wake_compute::wake_compute;
use crate::types::Host;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
@@ -49,7 +47,7 @@ pub(crate) trait ConnectMechanism {
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError>;
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
@@ -73,9 +71,6 @@ pub(crate) struct TcpMechanism<'a> {
/// connect_to_compute concurrency lock
pub(crate) locks: &'static ApiLocks<Host>,
/// Whether we should accept self-signed certificates (for testing)
pub(crate) allow_self_signed_compute: bool,
}
#[async_trait]
@@ -89,15 +84,11 @@ impl ConnectMechanism for TcpMechanism<'_> {
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration,
config: &ComputeConfig,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
permit.release_result(
node_info
.connect(ctx, self.allow_self_signed_compute, timeout)
.await,
)
permit.release_result(node_info.connect(ctx, config).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
@@ -112,7 +103,7 @@ pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBac
mechanism: &M,
user_info: &B,
wake_compute_retry_config: RetryConfig,
connect_to_compute_retry_config: RetryConfig,
compute: &ComputeConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
@@ -126,10 +117,7 @@ where
mechanism.update_connect_config(&mut node_info.config);
// try once
let err = match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
@@ -149,7 +137,7 @@ where
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
if should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
@@ -179,10 +167,7 @@ where
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
@@ -197,7 +182,7 @@ where
return Ok(res);
}
Err(e) => {
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
if !should_retry(&e, num_retries, compute.retry) {
// Don't log an error here, caller will print the error
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
@@ -213,7 +198,7 @@ where
}
};
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);

View File

@@ -8,12 +8,13 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info, warn};
use crate::auth::endpoint_sni;
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
use crate::config::TlsConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::ERR_INSECURE_CONNECTION;
use crate::stream::{PqStream, Stream, StreamUpgradeError};
use crate::tls::PG_ALPN_PROTOCOL;
#[derive(Error, Debug)]
pub(crate) enum HandshakeError {

View File

@@ -25,6 +25,7 @@ use self::connect_compute::{connect_to_compute, TcpMechanism};
use self::passthrough::ProxyPassthrough;
use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::conn::{Acceptor, TokioTcpAcceptor};
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
@@ -55,7 +56,7 @@ pub async fn run_until_cancelled<F: std::future::Future>(
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
listener: tokio::net::TcpListener,
acceptor: TokioTcpAcceptor,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -64,15 +65,11 @@ pub async fn task_main(
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
run_until_cancelled(acceptor.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
@@ -152,7 +149,7 @@ pub async fn task_main(
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
@@ -168,7 +165,7 @@ pub async fn task_main(
connections.close();
cancellations.close();
drop(listener);
drop(acceptor);
// Drain connections
connections.wait().await;
@@ -348,12 +345,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
params_compat,
params: &params,
locks: &config.connect_compute_locks,
// only used for console redirect testing.
allow_self_signed_compute: false,
},
&user_info,
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.await?;

View File

@@ -5,6 +5,7 @@ use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
use crate::cancellation;
use crate::compute::PostgresConnection;
use crate::config::ComputeConfig;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::stream::Stream;
@@ -67,9 +68,17 @@ pub(crate) struct ProxyPassthrough<P, S> {
}
impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
pub(crate) async fn proxy_pass(
self,
compute_config: &ComputeConfig,
) -> Result<(), ErrorSource> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
if let Err(err) = self
.compute
.cancel_closure
.try_cancel_query(compute_config)
.await
{
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
res

View File

@@ -22,14 +22,16 @@ use super::*;
use crate::auth::backend::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
};
use crate::config::{CertResolver, RetryConfig};
use crate::config::{ComputeConfig, RetryConfig};
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
use crate::control_plane::{
self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, NodeInfoCache,
};
use crate::error::ErrorKind;
use crate::postgres_rustls::MakeRustlsConnect;
use crate::tls::client_config::compute_client_config_with_certs;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::tls::server_config::CertResolver;
use crate::types::{BranchId, EndpointId, ProjectId};
use crate::{sasl, scram};
@@ -67,7 +69,7 @@ fn generate_certs(
}
struct ClientConfig<'a> {
config: rustls::ClientConfig,
config: Arc<rustls::ClientConfig>,
hostname: &'a str,
}
@@ -110,16 +112,7 @@ fn generate_tls_config<'a>(
};
let client_config = {
let config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.context("ring should support the default protocol versions")?
.with_root_certificates({
let mut store = rustls::RootCertStore::empty();
store.add(ca)?;
store
})
.with_no_client_auth();
let config = Arc::new(compute_client_config_with_certs([ca]));
ClientConfig { config, hostname }
};
@@ -468,7 +461,7 @@ impl ConnectMechanism for TestConnectMechanism {
&self,
_ctx: &RequestContext,
_node_info: &control_plane::CachedNodeInfo,
_timeout: std::time::Duration,
_config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
@@ -576,6 +569,20 @@ fn helper_create_connect_info(
user_info
}
fn config() -> ComputeConfig {
let retry = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
ComputeConfig {
retry,
tls: Arc::new(compute_client_config_with_certs(std::iter::empty())),
timeout: Duration::from_secs(2),
}
}
#[tokio::test]
async fn connect_to_compute_success() {
let _ = env_logger::try_init();
@@ -583,12 +590,8 @@ async fn connect_to_compute_success() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -601,12 +604,8 @@ async fn connect_to_compute_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -620,12 +619,8 @@ async fn connect_to_compute_non_retry_1() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap_err();
mechanism.verify();
@@ -639,12 +634,8 @@ async fn connect_to_compute_non_retry_2() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -665,17 +656,13 @@ async fn connect_to_compute_non_retry_3() {
max_retries: 1,
backoff_factor: 2.0,
};
let connect_to_compute_retry_config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
let config = config();
connect_to_compute(
&ctx,
&mechanism,
&user_info,
wake_compute_retry_config,
connect_to_compute_retry_config,
&config,
)
.await
.unwrap_err();
@@ -690,12 +677,8 @@ async fn wake_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -709,12 +692,8 @@ async fn wake_non_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -12,6 +12,7 @@ use uuid::Uuid;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::cache::project_info::ProjectInfoCache;
use crate::cancellation::{CancelMap, CancellationHandler};
use crate::config::ProxyConfig;
use crate::intern::{ProjectIdInt, RoleNameInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
@@ -39,6 +40,27 @@ pub(crate) enum Notification {
AllowedIpsUpdate {
allowed_ips_update: AllowedIpsUpdate,
},
#[serde(
rename = "/block_public_or_vpc_access_updated",
deserialize_with = "deserialize_json_string"
)]
BlockPublicOrVpcAccessUpdated {
block_public_or_vpc_access_updated: BlockPublicOrVpcAccessUpdated,
},
#[serde(
rename = "/allowed_vpc_endpoints_updated_for_org",
deserialize_with = "deserialize_json_string"
)]
AllowedVpcEndpointsUpdatedForOrg {
allowed_vpc_endpoints_updated_for_org: AllowedVpcEndpointsUpdatedForOrg,
},
#[serde(
rename = "/allowed_vpc_endpoints_updated_for_projects",
deserialize_with = "deserialize_json_string"
)]
AllowedVpcEndpointsUpdatedForProjects {
allowed_vpc_endpoints_updated_for_projects: AllowedVpcEndpointsUpdatedForProjects,
},
#[serde(
rename = "/password_updated",
deserialize_with = "deserialize_json_string"
@@ -51,6 +73,24 @@ pub(crate) enum Notification {
pub(crate) struct AllowedIpsUpdate {
project_id: ProjectIdInt,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct BlockPublicOrVpcAccessUpdated {
project_id: ProjectIdInt,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedVpcEndpointsUpdatedForOrg {
// TODO: change type once the implementation is more fully fledged.
// See e.g. https://github.com/neondatabase/neon/pull/10073.
account_id: ProjectIdInt,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedVpcEndpointsUpdatedForProjects {
project_ids: Vec<ProjectIdInt>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct PasswordUpdate {
project_id: ProjectIdInt,
@@ -164,7 +204,11 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
}
}
}
Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
Notification::AllowedIpsUpdate { .. }
| Notification::PasswordUpdate { .. }
| Notification::BlockPublicOrVpcAccessUpdated { .. }
| Notification::AllowedVpcEndpointsUpdatedForOrg { .. }
| Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
invalidate_cache(self.cache.clone(), msg.clone());
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
Metrics::get()
@@ -177,6 +221,8 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
.redis_events_count
.inc(RedisEventsCount::PasswordUpdate);
}
// TODO: add additional metrics for the other event types.
// It might happen that the invalid entry is on the way to be cached.
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
@@ -203,6 +249,15 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
password_update.role_name,
),
Notification::Cancel(_) => unreachable!("cancel message should be handled separately"),
Notification::BlockPublicOrVpcAccessUpdated { .. } => {
// https://github.com/neondatabase/neon/pull/10073
}
Notification::AllowedVpcEndpointsUpdatedForOrg { .. } => {
// https://github.com/neondatabase/neon/pull/10073
}
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
// https://github.com/neondatabase/neon/pull/10073
}
}
}
@@ -249,6 +304,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
/// Handle console's invalidation messages.
#[tracing::instrument(name = "redis_notifications", skip_all)]
pub async fn task_main<C>(
config: &'static ProxyConfig,
redis: ConnectionWithCredentialsProvider,
cache: Arc<C>,
cancel_map: CancelMap,
@@ -258,6 +314,7 @@ where
C: ProjectInfoCache + Send + Sync + 'static,
{
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
&config.connect_to_compute,
cancel_map,
crate::metrics::CancellationSource::FromRedis,
));

View File

@@ -50,6 +50,12 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
self.stream.write_message(&msg.to_reply()).await?;
Ok(())
}
// Queue a SASL message for the client.
fn send_noflush(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
self.stream.write_message_noflush(&msg.to_reply())?;
Ok(())
}
}
/// SASL authentication outcome.
@@ -85,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
continue;
}
Step::Success(result, reply) => {
self.send(&ServerMessage::Final(&reply)).await?;
self.send_noflush(&ServerMessage::Final(&reply))?;
Outcome::Success(result)
}
Step::Failure(reason) => Outcome::Failure(reason),

View File

@@ -13,7 +13,6 @@ use super::secret::ServerSecret;
use super::signature::SignatureBuilder;
use super::threadpool::ThreadPool;
use super::ScramKey;
use crate::config;
use crate::intern::EndpointIdInt;
use crate::sasl::{self, ChannelBinding, Error as SaslError};
@@ -59,14 +58,14 @@ enum ExchangeState {
pub(crate) struct Exchange<'a> {
state: ExchangeState,
secret: &'a ServerSecret,
tls_server_end_point: config::TlsServerEndPoint,
tls_server_end_point: crate::tls::TlsServerEndPoint,
}
impl<'a> Exchange<'a> {
pub(crate) fn new(
secret: &'a ServerSecret,
nonce: fn() -> [u8; SCRAM_RAW_NONCE_LEN],
tls_server_end_point: config::TlsServerEndPoint,
tls_server_end_point: crate::tls::TlsServerEndPoint,
) -> Self {
Self {
state: ExchangeState::Initial(SaslInitial { nonce }),
@@ -120,7 +119,7 @@ impl SaslInitial {
fn transition(
&self,
secret: &ServerSecret,
tls_server_end_point: &config::TlsServerEndPoint,
tls_server_end_point: &crate::tls::TlsServerEndPoint,
input: &str,
) -> sasl::Result<sasl::Step<SaslSentInner, Infallible>> {
let client_first_message = ClientFirstMessage::parse(input)
@@ -155,7 +154,7 @@ impl SaslSentInner {
fn transition(
&self,
secret: &ServerSecret,
tls_server_end_point: &config::TlsServerEndPoint,
tls_server_end_point: &crate::tls::TlsServerEndPoint,
input: &str,
) -> sasl::Result<sasl::Step<Infallible, super::ScramKey>> {
let Self {
@@ -168,8 +167,8 @@ impl SaslSentInner {
.ok_or(SaslError::BadClientMessage("invalid client-final-message"))?;
let channel_binding = cbind_flag.encode(|_| match tls_server_end_point {
config::TlsServerEndPoint::Sha256(x) => Ok(x),
config::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
crate::tls::TlsServerEndPoint::Sha256(x) => Ok(x),
crate::tls::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
})?;
// This might've been caused by a MITM attack

View File

@@ -77,11 +77,8 @@ mod tests {
const NONCE: [u8; 18] = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
];
let mut exchange = Exchange::new(
&secret,
|| NONCE,
crate::config::TlsServerEndPoint::Undefined,
);
let mut exchange =
Exchange::new(&secret, || NONCE, crate::tls::TlsServerEndPoint::Undefined);
let client_first = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
let client_final = "c=biws,r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,p=rw1r5Kph5ThxmaUBC2GAQ6MfXbPnNkFiTIvdb/Rear0=";

View File

@@ -22,7 +22,7 @@ use crate::compute;
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
};
use crate::config::ProxyConfig;
use crate::config::{ComputeConfig, ProxyConfig};
use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
@@ -196,7 +196,7 @@ impl PoolingBackend {
},
&backend,
self.config.wake_compute_retry_config,
self.config.connect_to_compute_retry_config,
&self.config.connect_to_compute,
)
.await
}
@@ -237,7 +237,7 @@ impl PoolingBackend {
},
&backend,
self.config.wake_compute_retry_config,
self.config.connect_to_compute_retry_config,
&self.config.connect_to_compute,
)
.await
}
@@ -502,7 +502,7 @@ impl ConnectMechanism for TokioMechanism {
&self,
ctx: &RequestContext,
node_info: &CachedNodeInfo,
timeout: Duration,
compute_config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
@@ -511,7 +511,7 @@ impl ConnectMechanism for TokioMechanism {
let config = config
.user(&self.conn_info.user_info.user)
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
.connect_timeout(compute_config.timeout);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(postgres_client::NoTls).await;
@@ -552,7 +552,7 @@ impl ConnectMechanism for HyperMechanism {
&self,
ctx: &RequestContext,
node_info: &CachedNodeInfo,
timeout: Duration,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
@@ -560,7 +560,7 @@ impl ConnectMechanism for HyperMechanism {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let port = node_info.config.get_port();
let res = connect_http2(&host, port, timeout).await;
let res = connect_http2(&host, port, config.timeout).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;

View File

@@ -35,7 +35,7 @@ use rand::rngs::StdRng;
use rand::SeedableRng;
use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
@@ -45,6 +45,7 @@ use utils::http::error::ApiError;
use crate::cancellation::CancellationHandlerMain;
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::conn::{Acceptor, TokioTcpAcceptor};
use crate::context::RequestContext;
use crate::ext::TaskExt;
use crate::metrics::Metrics;
@@ -59,7 +60,7 @@ pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static crate::auth::Backend<'static, ()>,
ws_listener: TcpListener,
ws_acceptor: TokioTcpAcceptor,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -134,7 +135,7 @@ pub async fn task_main(
connections.close(); // allows `connections.wait to complete`
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
while let Some(res) = run_until_cancelled(ws_acceptor.accept(), &cancellation_token).await {
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
if let Err(e) = conn.set_nodelay(true) {
tracing::error!("could not set nodelay: {e}");

View File

@@ -168,7 +168,7 @@ pub(crate) async fn serve_websocket(
Ok(Some(p)) => {
ctx.set_success();
ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),

View File

@@ -11,9 +11,9 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
use crate::config::TlsServerEndPoint;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::Metrics;
use crate::tls::TlsServerEndPoint;
/// Stream wrapper which implements libpq's protocol.
///

View File

@@ -0,0 +1,42 @@
use std::sync::Arc;
use anyhow::bail;
use rustls::crypto::ring;
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
let der_certs = rustls_native_certs::load_native_certs();
if !der_certs.errors.is_empty() {
bail!("could not parse certificates: {:?}", der_certs.errors);
}
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs.certs);
Ok(Arc::new(store))
}
/// Loads the root certificates and constructs a client config suitable for connecting to the neon compute.
/// This function is blocking.
pub fn compute_client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
Ok(
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(load_certs()?)
.with_no_client_auth(),
)
}
#[cfg(test)]
pub fn compute_client_config_with_certs(
certs: impl IntoIterator<Item = rustls::pki_types::CertificateDer<'static>>,
) -> rustls::ClientConfig {
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(certs);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(store)
.with_no_client_auth()
}

72
proxy/src/tls/mod.rs Normal file
View File

@@ -0,0 +1,72 @@
pub mod client_config;
pub mod postgres_rustls;
pub mod server_config;
use anyhow::Context;
use rustls::pki_types::CertificateDer;
use sha2::{Digest, Sha256};
use tracing::{error, info};
use x509_parser::oid_registry;
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L159>
pub const PG_ALPN_PROTOCOL: &[u8] = b"postgresql";
/// Channel binding parameter
///
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
/// Description: The hash of the TLS server's certificate as it
/// appears, octet for octet, in the server's Certificate message. Note
/// that the Certificate message contains a certificate_list, in which
/// the first element is the server's certificate.
///
/// The hash function is to be selected as follows:
///
/// * if the certificate's signatureAlgorithm uses a single hash
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
///
/// * if the certificate's signatureAlgorithm uses a single hash
/// function and that hash function neither MD5 nor SHA-1, then use
/// the hash function associated with the certificate's
/// signatureAlgorithm;
///
/// * if the certificate's signatureAlgorithm uses no hash functions or
/// uses multiple hash functions, then this channel binding type's
/// channel bindings are undefined at this time (updates to is channel
/// binding type may occur to address this issue if it ever arises).
#[derive(Debug, Clone, Copy)]
pub enum TlsServerEndPoint {
Sha256([u8; 32]),
Undefined,
}
impl TlsServerEndPoint {
pub fn new(cert: &CertificateDer<'_>) -> anyhow::Result<Self> {
let sha256_oids = [
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
oid_registry::OID_PKCS1_SHA256WITHRSA,
];
let pem = x509_parser::parse_x509_certificate(cert)
.context("Failed to parse PEM object from cerficiate")?
.1;
info!(subject = %pem.subject, "parsing TLS certificate");
let reg = oid_registry::OidRegistry::default().with_all_crypto();
let oid = pem.signature_algorithm.oid();
let alg = reg.get(oid);
if sha256_oids.contains(oid) {
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
Ok(Self::Sha256(tls_server_end_point))
} else {
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
Ok(Self::Undefined)
}
}
pub fn supported(&self) -> bool {
!matches!(self, TlsServerEndPoint::Undefined)
}
}

View File

@@ -18,7 +18,7 @@ mod private {
use tokio_rustls::client::TlsStream;
use tokio_rustls::TlsConnector;
use crate::config::TlsServerEndPoint;
use crate::tls::TlsServerEndPoint;
pub struct TlsConnectFuture<S> {
inner: tokio_rustls::Connect<S>,
@@ -126,16 +126,14 @@ mod private {
/// That way you can connect to PostgreSQL using `rustls` as the TLS stack.
#[derive(Clone)]
pub struct MakeRustlsConnect {
config: Arc<ClientConfig>,
pub config: Arc<ClientConfig>,
}
impl MakeRustlsConnect {
/// Creates a new `MakeRustlsConnect` from the provided `ClientConfig`.
#[must_use]
pub fn new(config: ClientConfig) -> Self {
Self {
config: Arc::new(config),
}
pub fn new(config: Arc<ClientConfig>) -> Self {
Self { config }
}
}

View File

@@ -0,0 +1,218 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use itertools::Itertools;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use super::{TlsServerEndPoint, PG_ALPN_PROTOCOL};
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
pub common_names: HashSet<String>,
pub cert_resolver: Arc<CertResolver>,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()
}
}
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert_path(key_path, cert_path, true)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
for entry in std::fs::read_dir(certs_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// file names aligned with default cert-manager names
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver.add_cert_path(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
}
}
}
}
let common_names = cert_resolver.get_common_names();
let cert_resolver = Arc::new(cert_resolver);
// allow TLS 1.2 to be compatible with older client libraries
let mut config =
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
.context("ring should support TLS1.2 and TLS1.3")?
.with_no_client_auth()
.with_cert_resolver(cert_resolver.clone());
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
if allow_tls_keylogfile {
// KeyLogFile will check for the SSLKEYLOGFILE environment variable.
config.key_log = Arc::new(rustls::KeyLogFile::new());
}
Ok(TlsConfig {
config: Arc::new(config),
common_names,
cert_resolver,
})
}
#[derive(Default, Debug)]
pub struct CertResolver {
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
}
impl CertResolver {
pub fn new() -> Self {
Self::default()
}
fn add_cert_path(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
self.add_cert(priv_key, cert_chain, is_default)
}
pub fn add_cert(
&mut self,
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
is_default: bool,
) -> anyhow::Result<()> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let pem = x509_parser::parse_x509_certificate(first_cert)
.context("Failed to parse PEM object from cerficiate")?
.1;
let common_name = pem.subject().to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some((cert.clone(), tls_server_end_point));
}
self.certs.insert(common_name, (cert, tls_server_end_point));
Ok(())
}
pub fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
}
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
self.resolve(client_hello.server_name()).map(|x| x.0)
}
}
impl CertResolver {
pub fn resolve(
&self,
server_name: Option<&str>,
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
//
// With the current coding foo.com will match *.foo.com and that
// repeats behavior of the old code.
if let Some(mut sni_name) = server_name {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
}
}
} else {
// No SNI, use the default certificate, otherwise we can't get to
// options parameter which can be used to set endpoint name too.
// That means that non-SNI flow will not work for CNAME domains in
// verify-full mode.
//
// If that will be a problem we can:
//
// a) Instead of multi-cert approach use single cert with extra
// domains listed in Subject Alternative Name (SAN).
// b) Deploy separate proxy instances for extra domains.
self.default.clone()
}
}
}

View File

@@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []
[dependencies]
async-stream.workspace = true
@@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "receive_wal"
harness = false
required-features = ["benchmarking"]

View File

@@ -1,11 +1,7 @@
//! WAL ingestion benchmarks.
#[path = "benchutils.rs"]
mod benchutils;
use std::io::Write as _;
use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
@@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
// Set up the Safekeeper.
let env = Env::new(fsync)?;
let mut safekeeper =
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
let mut safekeeper = runtime.block_on(env.make_safekeeper(
NodeId(1),
TenantTimelineId::generate(),
Lsn(0),
))?;
b.iter_batched_ref(
// Pre-construct WAL records and requests. Criterion will batch them.
@@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
let env = Env::new(fsync)?;
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
let walgen =
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
// Create buffered channels that can fit all requests, to avoid blocking on channels.
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
@@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
// TODO: WalAcceptor doesn't actually need a full timeline, only
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
@@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
// Construct and spawn the WalAcceptor task.
let env = Env::new(fsync)?;
@@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;

View File

@@ -564,7 +564,7 @@ pub fn make_router(
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
const ALLOWLIST_ROUTES: &[&str] =
&["/v1/status", "/metrics", "/profile/cpu", "profile/heap"];
&["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
None
} else {

View File

@@ -43,6 +43,9 @@ pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;
mod timelines_global_map;
use std::sync::Arc;
pub use timelines_global_map::GlobalTimelines;

View File

@@ -94,9 +94,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
}
}
let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
};
let batch = InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn
next_record_lsn: Some(max_next_record_lsn),
};
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();

View File

@@ -1,18 +1,18 @@
use std::sync::Arc;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::{control_file, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use safekeeper::rate_limit::RateLimiter;
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
pub struct Env {
/// Whether to enable fsync.
pub fsync: bool,
@@ -21,7 +21,7 @@ pub struct Env {
}
impl Env {
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
/// enable fsyncing.
pub fn new(fsync: bool) -> anyhow::Result<Self> {
let tempdir = camino_tempfile::tempdir()?;
@@ -47,6 +47,7 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
let conf = self.make_conf(node_id);
@@ -67,9 +68,9 @@ impl Env {
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
term: 1,
start_streaming_at: Lsn(0),
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
timeline_start_lsn: Lsn(0),
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;
@@ -82,12 +83,13 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let timeline = Timeline::new(

View File

@@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
}),
})
}

View File

@@ -43,13 +43,13 @@ scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
diesel = { version = "2.1.4", features = [
diesel = { version = "2.2.6", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
diesel_migrations = { version = "2.1.0" }
diesel_migrations = { version = "2.2.0" }
r2d2 = { version = "0.8.10" }
utils = { path = "../libs/utils/" }

View File

@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::error::Error as _;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
@@ -6,6 +7,7 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
@@ -28,6 +30,9 @@ struct UnshardedComputeHookTenant {
// Which node is this tenant attached to
node_id: NodeId,
// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
@@ -36,6 +41,9 @@ struct ShardedComputeHookTenant {
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,
// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
@@ -64,17 +72,24 @@ enum ComputeHookTenant {
impl ComputeHookTenant {
/// Construct with at least one shard's information
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
fn new(
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
preferred_az: Option<AvailabilityZone>,
node_id: NodeId,
) -> Self {
if tenant_shard_id.shard_count.count() > 1 {
Self::Sharded(ShardedComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
preferred_az,
send_lock: Arc::default(),
})
} else {
Self::Unsharded(UnshardedComputeHookTenant {
node_id,
preferred_az,
send_lock: Arc::default(),
})
}
@@ -120,15 +135,20 @@ impl ComputeHookTenant {
/// Set one shard's location. If stripe size or shard count have changed, Self is reset
/// and drops existing content.
fn update(
&mut self,
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
node_id: NodeId,
) {
fn update(&mut self, shard_update: ShardUpdate) {
let tenant_shard_id = shard_update.tenant_shard_id;
let node_id = shard_update.node_id;
let stripe_size = shard_update.stripe_size;
let preferred_az = shard_update.preferred_az;
match self {
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
unsharded_tenant.node_id = node_id
unsharded_tenant.node_id = node_id;
if unsharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
}
Self::Sharded(sharded_tenant)
if sharded_tenant.stripe_size == stripe_size
@@ -146,10 +166,21 @@ impl ComputeHookTenant {
.push((tenant_shard_id.shard_number, node_id));
sharded_tenant.shards.sort_by_key(|s| s.0)
}
if sharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
}
_ => {
// Shard count changed: reset struct.
*self = Self::new(tenant_shard_id, stripe_size, node_id);
*self = Self::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
);
}
}
}
@@ -165,6 +196,7 @@ struct ComputeHookNotifyRequestShard {
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
@@ -238,6 +270,10 @@ impl ComputeHookTenant {
node_id: unsharded_tenant.node_id,
}],
stripe_size: None,
preferred_az: unsharded_tenant
.preferred_az
.as_ref()
.map(|az| az.0.clone()),
}),
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
@@ -253,6 +289,7 @@ impl ComputeHookTenant {
})
.collect(),
stripe_size: Some(sharded_tenant.stripe_size),
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
})
}
Self::Sharded(sharded_tenant) => {
@@ -313,6 +350,17 @@ pub(super) struct ComputeHook {
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> Self {
let authorization_header = config
@@ -363,6 +411,7 @@ impl ComputeHook {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = reconfigure_request;
let compute_pageservers = shards
@@ -503,24 +552,30 @@ impl ComputeHook {
}
/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_prepare(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
) -> MaybeSendResult {
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant_shard_id = shard_update.tenant_shard_id;
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Vacant(e) => {
let ShardUpdate {
tenant_shard_id,
node_id,
stripe_size,
preferred_az,
} = shard_update;
e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
))
}
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant.update(shard_update);
tenant
}
};
@@ -608,13 +663,14 @@ impl ComputeHook {
/// if something failed.
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
notifications: Vec<ShardUpdate>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
cancel: &CancellationToken,
) {
let mut maybe_sends = Vec::new();
for (tenant_shard_id, node_id, stripe_size) in notifications {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
for shard_update in notifications {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}
@@ -678,15 +734,14 @@ impl ComputeHook {
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify(
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify<'a>(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
shard_update: ShardUpdate<'a>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
@@ -739,6 +794,7 @@ pub(crate) mod tests {
shard_number: ShardNumber(0),
},
ShardStripeSize(12345),
None,
NodeId(1),
);
@@ -765,30 +821,32 @@ pub(crate) mod tests {
// Writing the first shard of a multi-sharded situation (i.e. in a split)
// resets the tenant state and puts it in an non-notifying state (need to
// see all shards)
tenant_state.update(
TenantShardId {
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(1),
},
ShardStripeSize(32768),
NodeId(1),
);
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});
assert!(matches!(
tenant_state.maybe_send(tenant_id, None),
MaybeSendResult::Noop
));
// Writing the second shard makes it ready to notify
tenant_state.update(
TenantShardId {
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(0),
},
ShardStripeSize(32768),
NodeId(1),
);
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});
let send_result = tenant_state.maybe_send(tenant_id, None);
let MaybeSendResult::Transmit((request, mut guard)) = send_result else {

View File

@@ -11,6 +11,7 @@ use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::SafekeeperDescribeResponse;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
@@ -1241,6 +1242,18 @@ impl SafekeeperPersistence {
availability_zone_id: &self.availability_zone_id,
}
}
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
// omit the `active` flag on purpose: it is deprecated.
SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
version: self.version,
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
availability_zone_id: self.availability_zone_id.clone(),
}
}
}
#[derive(Insertable, AsChangeset)]

View File

@@ -1,13 +1,14 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use pageserver_api::controller_api::PlacementPolicy;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
use reqwest::StatusCode;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -45,6 +46,7 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,
pub(crate) config: TenantConfig,
pub(crate) preferred_az: Option<AvailabilityZone>,
/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
@@ -834,9 +836,12 @@ impl Reconciler {
let result = self
.compute_hook
.notify(
self.tenant_shard_id,
node.get_id(),
self.shard.stripe_size,
compute_hook::ShardUpdate {
tenant_shard_id: self.tenant_shard_id,
node_id: node.get_id(),
stripe_size: self.shard.stripe_size,
preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
},
&self.cancel,
)
.await;

View File

@@ -18,7 +18,7 @@ use crate::{
background_node_operations::{
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
},
compute_hook::NotifyError,
compute_hook::{self, NotifyError},
drain_utils::{self, TenantShardDrain, TenantShardIterator},
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
@@ -46,10 +46,11 @@ use pageserver_api::{
controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
@@ -656,11 +657,14 @@ impl Service {
// emit a compute notification for this. In the case where our observed state does not
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
if let Some(attached_at) = tenant_shard.stably_attached() {
compute_notifications.push((
*tenant_shard_id,
attached_at,
tenant_shard.shard.stripe_size,
));
compute_notifications.push(compute_hook::ShardUpdate {
tenant_shard_id: *tenant_shard_id,
node_id: attached_at,
stripe_size: tenant_shard.shard.stripe_size,
preferred_az: tenant_shard
.preferred_az()
.map(|az| Cow::Owned(az.clone())),
});
}
}
}
@@ -3568,6 +3572,11 @@ impl Service {
.iter()
.any(|i| i.generation.is_none() || i.generation_pageserver.is_none())
{
let shard_generations = generations
.into_iter()
.map(|i| (i.tenant_shard_id, (i.generation, i.generation_pageserver)))
.collect::<HashMap<_, _>>();
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
let locked = self.inner.read().unwrap();
@@ -3578,6 +3587,28 @@ impl Service {
PlacementPolicy::Attached(_) => {
// This shard is meant to be attached: the caller is not wrong to try and
// use this function, but we can't service the request right now.
let Some(generation) = shard_generations.get(shard_id) else {
// This can only happen if there is a split brain controller modifying the database. This should
// never happen when testing, and if it happens in production we can only log the issue.
debug_assert!(false);
tracing::error!("Shard {shard_id} not found in generation state! Is another rogue controller running?");
continue;
};
let (generation, generation_pageserver) = generation;
if let Some(generation) = generation {
if generation_pageserver.is_none() {
// This is legitimate only in a very narrow window where the shard was only just configured into
// Attached mode after being created in Secondary or Detached mode, and it has had its generation
// set but not yet had a Reconciler run (reconciler is the only thing that sets generation_pageserver).
tracing::warn!("Shard {shard_id} generation is set ({generation:?}) but generation_pageserver is None, reconciler not run yet?");
}
} else {
// This should never happen: a shard with no generation is only permitted when it was created in some state
// other than PlacementPolicy::Attached (and generation is always written to DB before setting Attached in memory)
debug_assert!(false);
tracing::error!("Shard {shard_id} generation is None, but it is in PlacementPolicy::Attached mode!");
continue;
}
}
PlacementPolicy::Secondary | PlacementPolicy::Detached => {
return Err(ApiError::Conflict(format!(
@@ -4786,7 +4817,15 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify(child_id, child_ps, stripe_size, &self.cancel)
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: child_id,
node_id: child_ps,
stripe_size,
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
},
&self.cancel,
)
.await
{
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
@@ -7158,15 +7197,24 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
self.persistence.list_safekeepers().await
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
Ok(self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Vec<_>>())
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
self.persistence.safekeeper_get(id).await
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.persistence
.safekeeper_get(id)
.await
.map(|v| v.as_describe_response())
}
pub(crate) async fn upsert_safekeeper(

View File

@@ -1198,6 +1198,7 @@ impl TenantShard {
detach,
reconciler_config,
config: self.config.clone(),
preferred_az: self.preferred_az_id.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),

View File

@@ -310,7 +310,7 @@ pub(crate) enum BlobDataParseResult {
index_part_generation: Generation,
s3_layers: HashSet<(LayerName, Generation)>,
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
Relic,
Incorrect {
errors: Vec<String>,
@@ -346,7 +346,7 @@ pub(crate) async fn list_timeline_blobs(
match res {
ListTimelineBlobsResult::Ready(data) => Ok(data),
ListTimelineBlobsResult::MissingIndexPart(_) => {
// Retry if index is missing.
// Retry if listing raced with removal of an index
let data = list_timeline_blobs_impl(remote_client, id, root_target)
.await?
.into_data();
@@ -358,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
enum ListTimelineBlobsResult {
/// Blob data is ready to be intepreted.
Ready(RemoteTimelineBlobData),
/// List timeline blobs has layer files but is missing [`IndexPart`].
/// The listing contained an index but when we tried to fetch it, we couldn't
MissingIndexPart(RemoteTimelineBlobData),
}
@@ -467,19 +467,19 @@ async fn list_timeline_blobs_impl(
match index_part_object.as_ref() {
Some(selected) => index_part_keys.retain(|k| k != selected),
None => {
// It is possible that the branch gets deleted after we got some layer files listed
// and we no longer have the index file in the listing.
errors.push(
// This case does not indicate corruption, but it should be very unusual. It can
// happen if:
// - timeline creation is in progress (first layer is written before index is written)
// - timeline deletion happened while a stale pageserver was still attached, it might upload
// a layer after the deletion is done.
tracing::info!(
"S3 list response got no index_part.json file but still has layer files"
.to_string(),
);
return Ok(ListTimelineBlobsResult::MissingIndexPart(
RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
},
));
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys,
}));
}
}

View File

@@ -8,6 +8,7 @@ pytest_plugins = (
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
"fixtures.compute_migrations",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING
import pytest
from fixtures.paths import BASE_DIR
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"
COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)
@pytest.fixture(scope="session")
def compute_migrations_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations directory.
"""
yield COMPUTE_MIGRATIONS_DIR
@pytest.fixture(scope="session")
def compute_migrations_test_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations test directory.
"""
yield COMPUTE_MIGRATIONS_TEST_DIR

View File

@@ -55,3 +55,17 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text
def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []
for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
}
)
res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
res.raise_for_status()

Some files were not shown because too many files have changed in this diff Show More