Compare commits

..

55 Commits

Author SHA1 Message Date
Devin AI
7cdb292b37 Add pg_tenant_only_auth_public_key_path check to safekeeper authentication
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 13:49:18 +00:00
Devin AI
a55e0192dc Add missing bail import to pageserver.rs
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 10:34:31 +00:00
Devin AI
fa3ceab30e Fix formatting issues
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 10:15:37 +00:00
Devin AI
7710c18761 Check both HTTP and PostgreSQL authentication in safekeeper
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:52:25 +00:00
Devin AI
1e14d784f4 Update allowed_errors.py to include new dev mode warning message
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:51:39 +00:00
Devin AI
2574bbe072 Update dev mode warning message to match storage controller
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:51:15 +00:00
Devin AI
d74c715602 Check pg_auth_type in pageserver authentication check
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:50:04 +00:00
Devin AI
69cfd1f7e0 Remove redundant --dev flag in neon_local.rs
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:47:21 +00:00
Devin AI
5dea3e2195 storage: require authentication by default, unless --dev is specified (#11237)
Co-Authored-By: John Spray <john@neon.tech>
2025-04-10 08:43:15 +00:00
Tristan Partin
a04e33ceb6 Remove --spec-json argument from compute_ctl (#11510)
It isn't used by the production control plane or neon_local. The removal
simplifies compute spec logic just a little bit more since we can remove
any notion of whether we should allow live reconfigurations.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-09 22:39:54 +00:00
Alex Chi Z.
af0be11503 fix(pageserver): ensure gc-compaction gets preempted by L0 (#11512)
## Problem

Part of #9114 

## Summary of changes

Gc-compaction flag was not correctly set, causing it not getting
preempted by L0.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-09 21:41:11 +00:00
Alex Chi Z.
405a17bf0b fix(pageserver): ensure gc-compaction gets preempted by L0 (#11512)
## Problem

Part of #9114 

## Summary of changes

Gc-compaction flag was not correctly set, causing it not getting
preempted by L0.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-09 20:57:50 +00:00
Erik Grinaker
63ee8e2181 test_runner: ignore .___temp files in evict_random_layers (#11509)
## Problem

`test_location_conf_churn` often fails with `neither image nor delta
layer`, but doesn't say what the file actually is. However, past local
failures have indicated that it might be `.___temp` files.

Touches https://github.com/neondatabase/neon/issues/11348.

## Summary of changes

Ignore `.___temp` files when evicting local layers, and include the file
name in the error message.
2025-04-09 19:03:49 +00:00
Alex Chi Z.
2c21a65b0b feat(pageserver): add gc-compaction time-to-first-item stats (#11475)
## Problem

In some cases gc-compaction doesn't respond to the L0 compaction yield
notifier. I suspect it's stuck on getting the first item, and if so, we
probably need to let L0 yield notifier preempt `next_with_trace`.

## Summary of changes

- Add `time_to_first_kv_pair` to gc-compaction statistics.
- Inverse the ratio so that smaller ratio -> better compaction ratio.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-09 18:07:58 +00:00
Alex Chi Z.
ec66b788e2 fix(pageserver): use different walredo retry setting for gc-compaction (#11497)
## Problem

Not a complete fix for https://github.com/neondatabase/neon/issues/11492
but should work for a short term.

Our current retry strategy for walredo is to retry every request exactly
once. This retry doesn't make sense because it retries all requests
exactly once and each error is expected to cause process restart and
cause future requests to fail. I'll explain it with a scenario of two
threads requesting redos: one with an invalid history (that will cause
walredo to panic) and another that has a correct redo sequence.

First let's look at how we handle retries right now in
do_with_walredo_process. At the beginning of the function it will spawn
a new process if there's no existing one. Then it will continue to redo.
If the process fails, the first process that encounters the error will
remove the walredo process object from the OnceCell, so that the next
time it gets accessed, a new process will be spawned; if it is the last
one that uses the old walredo process, it will kill and wait the process
in `drop(proc)`. I'm skeptical whether this works under races but I
think this is not the root cause of the problem. In this retry handler,
if there are N requests attached to a walredo process and the i-th
request fails (panics the walredo), all other N-i requests will fail and
they need to retry so that they can access a new walredo process.

```
time       ---->
proc        A                 None   B
request 1   ^-----------------^ fail
            uses A for redo   replace with None
request 2      ^-------------------- fail
               uses A for redo
request 3             ^----------------^ fail
                      uses A for redo  last ref, wait for A to be killed
request 4                            ^---------------
                                     None, spawn new process B
```

The problem is with our retry strategy. Normally, for a system that we
want to retry on, the probability of errors for each of the requests are
uncorrelated. However, in walredo, a prior request that panics the
walredo process will cause all future walredo on that process to fail
(that's correlated).

So, back to the situation where we have 2 requests where one will
definitely fail and the other will succeed and we get the following
sequence, where retry attempts = 1,

* new walredo process A starts.
* request 1 (invalid) being processed on A and panics A, waiting for
retry, remove process A from the process object.
* request 2 (valid) being processed on A and receives pipe broken /
poisoned process error, waiting for retry, wait for A to be killed --
this very likely takes a while and cannot finish before request 1 gets
processed again
* new walredo process B starts.
* request 1 (invalid) being processed again on B and panics B, the whole
request fail.
* request 2 (valid) being processed again on B, and get a poisoned error
again.

```
time       ---->
proc        A                 None           B                    None
request 1   ^-----------------^--------------^--------------------^
            spawn A for redo  fail          spawn B for redo     fail
request 2      ^--------------------^-------------------------^------------^
               use A for redo       fail, wait to kill A      B for redo   fail again
```

In such cases, no matter how we set n_attempts, as long as the retry
count applies to all requests, this sequence is bound to fail both
requests because of how they get sequenced; while we could potentially
make request 2 successful.

There are many solutions to this -- like having a separate walredo
manager for compactions, or define which errors are retryable (i.e.,
broken pipe can be retried, while real walredo error won't be retried),
or having a exclusive big lock over the whole redo process (the current
one is very fine-grained). In this patch, we go with a simple approach:
use different retry attempts for different types of requests.

For gc-compaction, the attempt count is set to 0, so that it never
retries and consequently stops the compaction process -- no more redo
will be issued from gc-compaction. Once the walredo process gets
restarted, the normal read requests will proceed normally.

## Summary of changes

Add redo_attempt for each reconstruct value request to set different
retry policies.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-04-09 18:01:31 +00:00
Peter Bendel
af12647b9d large tenant oltp benchmark: reindex with downtime (remove concurrently) (#11498)
## Problem

our large oltp benchmark runs very long - we want to remove the duration
of the reindex step.
we don't run concurrent workload anyhow but added "concurrently" only to
have a "prod-like" approach. But if it just doubles the time we report
because it requires two instead of one full table scan we can remove it

## Summary of changes

remove keyword concurrently from the reindex step
2025-04-09 17:11:00 +00:00
Tristan Partin
1c237d0c6d Move compute_ctl claims struct into public API (#11505)
This is preparatory work for teaching neon_local to pass the
Authorization header to compute_ctl.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-09 16:58:44 +00:00
Tristan Partin
afd34291ca Make neon_local token generation generic over claims (#11507)
Instead of encoding a certain structure for claims, let's allow the
caller to specify what claims be encoded.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-09 16:41:29 +00:00
Vlad Lazar
66f80e77ba tests/performance: reconcile until idle before benchmark (#11435)
We'd like to run benchmarks starting from a steady state. To this end,
do a reconciliation round before proceeding with the benchmark.

This is useful for benchmarks that use tenant dir snapshots since a
non-standard tenant configuration is used to generate the snapshot. The
storage controller is not aware of the non default tenant configuration
and will reconcile while the bench is running.
2025-04-09 16:32:19 +00:00
Conrad Ludgate
72832b3214 chore: fix clippy lints from nightly-2025-03-16 (#11273)
I like to run nightly clippy every so often to make our future rust
upgrades easier. Some notable changes:

* Prefer `next_back()` over `last()`. Generic iterators will implement
`last()` to run forward through the iterator until the end.

* Prefer `io::Error::other()`.

* Use implicit returns

One case where I haven't dealt with the issues is the now
[more-sensitive "large enum variant"
lint](https://github.com/rust-lang/rust-clippy/pull/13833). I chose not
to take any decisions around it here, and simply marked them as allow
for now.
2025-04-09 15:04:42 +00:00
Vlad Lazar
d11f23a341 pageserver: refactor read path for multi LSN batching support (#11463)
## Problem

We wish to improve pageserver batching such that one batch can contain
requests for
pages at different LSNs. The current shape of the code doesn't lend
itself to the change.

## Summary of changes

Refactor the read path such that the fringe gets initialized upfront.
This is where the multi LSN
change will plug in. A couple other small changes fell out of this.

There should be NO behaviour change here. If you smell one, shout!

I recommend reviewing commits individually (intentionally made them as
small as possible).

Related: https://github.com/neondatabase/neon/issues/10765
2025-04-09 13:17:02 +00:00
Dmitrii Kovalkov
e7502a3d63 pageserver: return 412 PreconditionFailed in get_timestamp_of_lsn if timestamp is not found (#11491)
## Problem
Now `get_timestamp_of_lsn` returns `404 NotFound` if there is no clog
pages for given LSN, and it's difficult to distinguish from other 404
errors. A separate status code for this error will allow the control
plane to handle this case.
- Closes: https://github.com/neondatabase/neon/issues/11439
- Corresponding PR in control plane:
https://github.com/neondatabase/cloud/pull/27125

## Summary of changes
- Return `412 PreconditionFailed` instead of `404 NotFound` if no
timestamp is fond for given LSN.

I looked briefly through the current error handling code in cloud.git
and the status code change should not affect anything for the existing
code. Change from the corresponding PR also looks fine and should work
with the current PS status code. Additionally, here is OK to merge it
from control plane team:
https://github.com/neondatabase/neon/issues/11439#issuecomment-2789327552

---------

Co-authored-by: John Spray <john@neon.tech>
2025-04-09 13:16:15 +00:00
Heikki Linnakangas
ef8101a9be refactor: Split "communicator" routines to a separate source file (#11459)
pagestore_smgr.c had grown pretty large. Split into two parts, such
that the smgr routines that PostgreSQL code calls stays in
pagestore_smgr.c, and all the prefetching logic and other lower-level
routines related to communicating with the pageserver are moved to a
new source file, "communicator.c".

There are plans to replace communicator parts with a new
implementation. See https://github.com/neondatabase/neon/pull/10799.
This commit doesn't implement any of the new things yet, but it is
good preparation for it. I'm imagining that the new implementation
will approximately replace the current "communicator.c" code, exposing
roughly the same functions to pagestore_smgr.c.

This commit doesn't change any functionality or behavior, or make any
other changes to the existing code: It just moves existing code
around.
2025-04-09 12:28:59 +00:00
Arpad Müller
d2825e72ad Add is_stopping check around critical macro in walreceiver (#11496)
The timeline stopping state is set much earlier than the cancellation
token is fired, so by checking for the stopping state, we can prevent
races with timeline shutdown where we issue a cancellation error but the
cancellation token hasn't been fired yet.

Fix #11427.
2025-04-09 12:17:45 +00:00
Erik Grinaker
a6ff8ec3d4 storcon: change default stripe size to 16 MB (#11168)
## Problem

The current stripe size of 256 MB is a bit large, and can cause load
imbalances across shards. A stripe size of 16 MB appears more reasonable
to avoid hotspots, although we don't see evidence of this in benchmarks.

Resolves https://github.com/neondatabase/cloud/issues/25634.
Touches https://github.com/neondatabase/cloud/issues/21870.

## Summary of changes

* Change the default stripe size to 16 MB.
* Remove `ShardParameters::DEFAULT_STRIPE_SIZE`, and only use
`pageserver_api::shard::DEFAULT_STRIPE_SIZE`.
* Update a bunch of tests that assumed a certain stripe size.
2025-04-09 08:41:38 +00:00
Dmitrii Kovalkov
cf62017a5b storcon: add https metrics for pageservers/safekeepers (#11460)
## Problem
Storcon will not start up if `use_https` is on and there are some
pageservers or safekeepers without https port in the database. Metrics
"how many nodes with https we have in DB" will help us to make sure that
`use_https` may be turned on safely.
- Part of https://github.com/neondatabase/cloud/issues/25526

## Summary of changes
- Add `storage_controller_https_pageserver_nodes`,
`storage_controller_safekeeper_nodes` and
`storage_controller_https_safekeeper_nodes` Prometheus metrics.
2025-04-09 08:33:49 +00:00
Erik Grinaker
c610f3584d test_runner: tweak test_create_snapshot compaction (#11495)
## Problem

With the recent improvements to L0 compaction responsiveness,
`test_create_snapshot` now ends up generating 10,000 layer files
(compared to 1,000 in previous snapshots). This increases the snapshot
size by 4x, and significantly slows down tests.

## Summary of changes

Increase the target layer size from 128 KB to 256 KB, and the L0
compaction threshold from 1 to 5. This reduces the layer count from
about 10,000 to 1,000.
2025-04-09 06:52:49 +00:00
Konstantin Knizhnik
c9ca8b7c4a One more fix for unlogged build support in DEBUG_COMPARE_LOCAL (#11474)
## Problem

Support of unlogged build in DEBUG_COMPARE_LOCAL.
Neon SMGR treats present of local file as indicator of unlogged
relations.
But it doesn't work in  DEBUG_COMPARE_LOCAL mode.

## Summary of changes

Use INIT_FORKNUM as indicator of unlogged file and create this file
while unlogged index build.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-09 05:14:29 +00:00
Erik Grinaker
7679b63a2c pageserver: persist stripe size in tenant manifest for tenant_import (#11181)
## Problem

`tenant_import`, used to import an existing tenant from remote storage
into a storage controller for support and debugging, assumed
`DEFAULT_STRIPE_SIZE` since this can't be recovered from remote storage.
In #11168, we are changing the stripe size, which will break
`tenant_import`.

Resolves #11175.

## Summary of changes

* Add `stripe_size` to the tenant manifest.
* Add `TenantScanRemoteStorageShard::stripe_size` and return from
`tenant_scan_remote` if present.
* Recover the stripe size during`tenant_import`, or fall back to 32768
(the original default stripe size).
* Add tenant manifest compatibility snapshot:
`2025-04-08-pgv17-tenant-manifest-v1.tar.zst`

There are no cross-version concerns here, since unknown fields are
ignored during deserialization where relevant.
2025-04-08 20:43:27 +00:00
Erik Grinaker
d177654e5f gitignore: add /artifact_cache (#11493)
## Problem

This is generated e.g. by `test_historic_storage_formats`, and causes
VSCode to list all the contained files as new.

## Summary of changes

Add `/artifact_cache` to `.gitignore`.
2025-04-08 16:57:10 +00:00
Alex Chi Z.
a09c933de3 test(pageserver): add conditional append test record (#11476)
## Problem

For future gc-compaction tests when we support
https://github.com/neondatabase/neon/issues/10395

## Summary of changes

Add a new type of neon test WAL record that is conditionally applied
(i.e., only when image == the specified value). We can use this to mock
the situation where we lose some records in the middle, firing an error,
and see how gc-compaction reacts to it.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-08 16:08:44 +00:00
Mikhail Kot
6138d61592 Object storage proxy (#11357)
Service targeted for storing and retrieving LFC prewarm data.
Can be used for proxying S3 access for Postgres extensions like
pg_mooncake as well.

Requests must include a Bearer JWT token.
Token is validated using a pemfile (should be passed in infra/).

Note: app is not tolerant to extra trailing slashes, see app.rs
`delete_prefix` test for comments.

Resolves: https://github.com/neondatabase/cloud/issues/26342
Unrelated changes: gate a `rename_noreplace` feature and disable it in
`remote_storage` so as `object_storage` can be built with musl
2025-04-08 14:54:53 +00:00
Roman Zaynetdinov
a7142f3bc6 Configure rsyslog for logs export using the spec (#11338)
- Work on https://github.com/neondatabase/cloud/issues/24896
- Cplane part https://github.com/neondatabase/cloud/pull/26808

Instead of reconfiguring rsyslog via an API endpoint [we have
agreed](https://neondb.slack.com/archives/C04DGM6SMTM/p1743513810964509?thread_ts=1743170369.865859&cid=C04DGM6SMTM)
to have a new `logs_export_host` field as part of the compute spec.

---------

Co-authored-by: Tristan Partin <tristan@neon.tech>
2025-04-08 14:03:09 +00:00
Dmitrii Kovalkov
7791a49dd4 fix(tests): improve test_scrubber_tenant_snapshot stability (#11471)
## Problem
`test_scrubber_tenant_snapshot` is flaky with `request was dropped`
errors. More details are in the issue.
- Closes: https://github.com/neondatabase/neon/issues/11278

## Summary of changes
- Disable shard scheduling during pageservers restart
- Add `reconcile_until_idle` in the end of the test
2025-04-08 10:03:38 +00:00
dependabot[bot]
8a6d0dccaa build(deps): bump tokio from 1.38.0 to 1.38.2 in /test_runner/pg_clients/rust/tokio-postgres in the cargo group across 1 directory (#11478)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-04-08 10:01:15 +00:00
Heikki Linnakangas
7ffcbfde9a refactor: Move LFC function prototypes to separate header file (#11458)
Also, move the call to the lfc_init() function. It was weird to have it
in libpagestore.c, when libpagestore.c otherwise had nothing to do with
the LFC. Move it directly into _PG_init()
2025-04-08 09:03:56 +00:00
Konstantin Knizhnik
b2a0b2e9dd Skip hole tags in local_cache view (#11454)
## Problem

If the local file cache is shrunk, so that we punch some holes in the
underlying file, the local_cache view displays the holes incorrectly.
See https://github.com/neondatabase/neon/issues/10770

## Summary of changes

Skip hole tags in the local_cache view.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-08 03:52:50 +00:00
Alex Chi Z.
0875dacce0 fix(pageserver): more aggressively yield in gc-compaction, degrade errors to warnings (#11469)
## Problem

Fix various small issues discovered during gc-compaction rollout.

## Summary of changes

- Log level changes: if errors are from gc-compaction, fire a warning
instead of errors or critical errors.
- Yield to L0 compaction more aggressively. Instead of checking every 1k
keys, we check on every key. Sometimes a single key reconstruct takes a
long time.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-07 21:19:06 +00:00
Erik Grinaker
99d8788756 pageserver: improve tenant manifest lifecycle (#11328)
## Problem

Currently, the tenant manifest is only uploaded if there are offloaded
timelines. The checks are also a bit loose (e.g. only checks number of
offloaded timelines). We want to start using the manifest for other
things too (e.g. stripe size).

Resolves #11271.

## Summary of changes

This patch ensures that a tenant manifest always exists. The lifecycle
is:

* During preload, fetch the existing manifest, if any.
* During attach, upload a tenant manifest if it differs from the
preloaded one (or does not exist).
* Upload a new manifest as needed, if it differs from the last-known
manifest (ignoring version number).
* On splits, pre-populate the manifest from the parent.
* During Pageserver physical GC, remove old manifests but keep the
latest 2 generations.

This will cause nearly all existing tenants to upload a new tenant
manifest on their first attach after this change. Attaches are
concurrency-limited in the storage controller, so we expect this will be
fine.

Also updates `make_broken` to automatically log at `INFO` level when the
tenant has been cancelled, to avoid spurious error logs during shutdown.
2025-04-07 19:10:36 +00:00
Erik Grinaker
26c5c7e942 pageserver: set Stopping state on attach cancellation (#11462)
## Problem

If a tenant is cancelled (e.g. due to Pageserver shutdown) during
attach, it is set to `Broken`. This results both in error log spam and
500 responses for clients -- shutdown is supposed to return 503
responses which can be retried.

This becomes more likely to happen with #11328, where we perform tenant
manifest downloads/uploads during attach.

## Summary of changes

Set tenant state to `Stopping` when attach fails and the tenant is
cancelled, downgrading the log messages to INFO. This introduces two
variants of `Stopping` -- with and without a caller barrier -- where the
latter is used to signal attach cancellation.
2025-04-07 17:56:56 +00:00
Arpad Müller
8a2b19f467 Allow potential warning in test_storcon_create_delete_sk_down (#11466)
Since merging #11400 and addition of
`test_storcon_create_delete_sk_down`, we've seen an error occur multiple
times.

https://github.com/neondatabase/neon/pull/11400#issuecomment-2782528369
2025-04-07 16:52:54 +00:00
Arpad Müller
486872dd28 Add support to specify auth token via --auth-token-path (#11443)
Before we specified the JWT via `SAFEKEEPER_AUTH_TOKEN`, but env vars
are quite public, both in procfs as well as the unit files. So add a way
to put the auth token into a file directly.

context: https://neondb.slack.com/archives/C033RQ5SPDH/p1743692566311099
2025-04-07 16:12:04 +00:00
Alex Chi Z.
d37e90f430 fix(pageserver): allow shard ancestor compaction to be cancelled (#11452)
## Problem

https://github.com/neondatabase/neon/issues/11330
https://github.com/neondatabase/neon/issues/11358

## Summary of changes

Looking at the staging log, a few tenants right after shard split are
stuck on shutdown because they are running shard ancestor compaction.
The compaction does not respect the cancellation token.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-07 16:01:21 +00:00
Konstantin Knizhnik
8eb701d706 Save FSM/VM pages on normal shutdown (#11449)
## Problem

See https://neondb.slack.com/archives/C03QLRH7PPD/p1743746717119179

We wallow FSM/VM pages when they are written to disk to persist them in
PS.
But it is not happen during shutdown checkpoint, because writing to WAL
during checkpoint cause Postgres panic.

## Summary of changes

Move `CheckPointBuffers` call to `PreCheckPointGuts`

Postgres PRs:
https://github.com/neondatabase/postgres/pull/615
https://github.com/neondatabase/postgres/pull/614
https://github.com/neondatabase/postgres/pull/613
https://github.com/neondatabase/postgres/pull/612

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-07 13:56:55 +00:00
Conrad Ludgate
85a515c176 update tokio for RUSTSEC-2025-0023 (#11464) 2025-04-07 13:33:56 +00:00
Christian Schwarz
aa88279681 fix(storcon/http): node status API returns serialized runtime object (#11461)
The Serialize impl on the `Node` type is for the `/debug` endpoint only.
Committed APIs should use the `NodeDescribeResponse`.

Refs
- fixes https://github.com/neondatabase/neon/issues/11326
- found while working on admin UI change
https://github.com/neondatabase/cloud/pull/26207
2025-04-07 12:23:40 +00:00
Heikki Linnakangas
b2a670c765 refactor: Use same prototype for neon_read_at_lsn on all PG versions (#11457)
The 'neon_read' function needs to have a different prototype on PG < 16,
because it's part of the smgr interface. But neon_read_at_lsn doesn't
have that restriction.
2025-04-07 11:04:36 +00:00
a-masterov
ad9655bb01 Fix the errors in pg_regress test running on the staging. (#11432)
## Problem
The shared libraries preloaded by default interfered with the
`pg_regress` tests on staging, causing wrong results
## Summary of changes
The projects used for these tests are now free from unnecessary
extensions. Some changes were made in patches.
2025-04-06 19:30:21 +00:00
Heikki Linnakangas
1a87975d95 Misc cleanup of #includes and comments in the neon extension (#11456)
Remove useless and often wrong IDENTIFICATION comments. PostgreSQL
sources have them, mostly for historical reasons, but there's no need
for us to copy that style.

Remove unnecessary #includes in header files, putting the #includes
directly in the .c files that need them. The principle is that a header
file should #include other header files if they need definitions from
them, such that each header file can be compiled on its own, but not
other #includes. (There are tools to enforce that, but this was just a
manual clean up of violations that I happened to spot.)
2025-04-06 15:34:13 +00:00
dependabot[bot]
417b2781d9 build(deps): bump openssl from 0.10.70 to 0.10.72 in /test_runner/pg_clients/rust/tokio-postgres in the cargo group across 1 directory (#11455)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-04-05 13:00:51 +00:00
Suhas Thalanki
2841f1ffa5 removal of pg_embedding (#11440)
## Problem

The `pg_embedding` extension has been deprecated and can cause issues
with recent changes such as with
https://github.com/neondatabase/neon/issues/10973

Issue: `PG:2025-04-03 15:39:25.498 GMT
ttid=a4de5bee50225424b053dc64bac96d87/d6f3891b8f968458b3f7edea58fb3c6f
sqlstate=58P01 [15526] ERROR: could not load library
"/usr/local/lib/embedding.so": /usr/local/lib/embedding.so: undefined
symbol: SetLastWrittenLSNForRelation`

## Summary of changes

Removed `pg_embedding` extension from the compute image.
2025-04-04 18:21:23 +00:00
Christian Schwarz
aad410c8f1 improve ondemand-download latency observability (#11421)
## Problem

We don't have metrics to exactly quantify the end user impact of
on-demand downloads.

Perf tracing is underway (#11140) to supply us with high-resolution
*samples*.

But it will also be useful to have some aggregate per-timeline and
per-instance metrics that definitively contain all observations.

## Summary of changes

This PR consists of independent commits that should be reviewed
independently.

However, for convenience, we're going to merge them together.

- refactor(metrics): measure_remote_op can use async traits
- impr(pageserver metrics): task_kind dimension for
remote_timeline_client latency histo
  - implements https://github.com/neondatabase/cloud/issues/26800
- refs
https://github.com/neondatabase/cloud/issues/26193#issuecomment-2769705793
- use the opportunity to rename the metric and add a _global suffix;
checked grafana export, it's only used in two personal dashboards, one
of them mine, the other by Heikki
- log on-demand download latency for expensive-to-query but precise
ground truth
- metric for wall clock time spent waiting for on-demand downloads

## Refs

- refs https://github.com/neondatabase/cloud/issues/26800
- a bunch of minor investigations / incidents into latency outliers
2025-04-04 18:04:39 +00:00
Christian Schwarz
4f94751b75 pageserver config: ignore+warn about unknown fields (instead of deny_unknown_fields) (#11275)
# Refs
- refs https://github.com/neondatabase/neon/issues/8915
- discussion thread:
https://neondb.slack.com/archives/C033RQ5SPDH/p1742406381132599
- stacked atop https://github.com/neondatabase/neon/pull/11298
- corresponding internal docs update that illustrates how this PR
removes friction: https://github.com/neondatabase/docs/pull/404

# Problem

Rejecting `pageserver.toml`s with unknown fields adds friction,
especially when using `pageserver.toml` fields as feature flags that
need to be decommissioned.

See the added paragraphs on `pageserver_api::models::ConfigToml` for
details on what kind of friction it causes.

Also read the corresponding internal docs update linked above to see a
more imperative guide for using `pageserver.toml` flags as feature
flags.

# Solution

## Ignoring unknown fields

Ignoring is the serde default behavior.

So, just remove `serde(deny_unknown_fields)` from all structs in
`pageserver_api::config::ConfigToml`
`pageserver_api::config::TenantConfigToml`.

I went through all the child fields and verified they don't use
`deny_unknown_fields` either, including those shared with
`pageserver_api::models`.

## Warning about unknown fields

We still want to warn about unknown fields to 
- be informed about typos in the config template
- be reminded about feature-flag style configs that have been cleaned up
in code but not yet in config templates

We tried `serde_ignore` (cf draft #11319) but it doesn't work with
`serde(flatten)`.

The solution we arrived at is to compare the on-disk TOML with the TOML
that we produce if we serialize the `ConfigToml` again.
Any key specified in the on-disk TOML but not present in the serialized
TOML is flagged as an ignored key.
The mechanism to do it is a tiny recursive decent visitor on the
`toml_edit::DocumentMut`.

# Future Work

Invalid config _values_ in known fields will continue to fail pageserver
startup.
See
- https://github.com/neondatabase/cloud/issues/24349
for current worst case impact to deployments & ideas to improve.
2025-04-04 17:30:58 +00:00
Christian Schwarz
6ee84d985a impr(perf tracing): ability to correlate with page_service logs (#11398)
# Problem

Current perf tracing fields do not allow answering the question what a
specific Postgres backend was waiting for.

# Background

For Pageserver logs, we set the backend PID as the libpq
`application_name` on the compute side, and funnel that into the a
tracing field for the spans that emit to the global tracing subscriber.

# Solution

Funnel `application_name`, and the other fields that we use in the
logging spans, into the root span for perf tracing.

# Refs

- fixes https://github.com/neondatabase/neon/issues/11393
- stacked atop https://github.com/neondatabase/neon/pull/11433
- epic: https://github.com/neondatabase/neon/issues/9873
2025-04-04 15:13:54 +00:00
JC Grünhage
295be03a33 impr(ci): send clearer notifications to slack when retrying container image pushes (#11447)
## Problem
We've started sending slack notifications for failed container image
pushes that are being retried. There are more messages coming in than
expected, so clicking through the link to see what image failed is
happening more often than we hoped.

## Summary of changes
- Make slack notifications clearer, including whether the job succeeded
and what retries have happened.
- Log failures/retries in step more clearly, so that you can easily see
when something fails.
2025-04-04 14:56:41 +00:00
155 changed files with 6146 additions and 5651 deletions

View File

@@ -19,6 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -2,6 +2,9 @@ import json
import os
import subprocess
RED = "\033[91m"
RESET = "\033[0m"
image_map = os.getenv("IMAGE_MAP")
if not image_map:
raise ValueError("IMAGE_MAP environment variable is not set")
@@ -29,9 +32,14 @@ while len(pending) > 0:
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
failures.append((" ".join(cmd), result.stdout))
failures.append((" ".join(cmd), result.stdout, target))
pending.append((source, target))
print(
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
)
print(result.stdout)
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
failed_targets = [target for _, _, target in failures]
with open(github_output, "a") as f:
f.write("slack_notify=true\n")
f.write(f"push_failures={json.dumps(failed_targets)}\n")

View File

@@ -110,12 +110,19 @@ jobs:
IMAGE_MAP: ${{ inputs.image-map }}
- name: Notify Slack if container image pushing fails
if: steps.push.outputs.slack_notify == 'true' || failure()
if: steps.push.outputs.push_failures || failure()
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
text: |
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
text: >
*Container image pushing ${{
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
}}* in
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
${{ steps.push.outputs.push_failures && format(
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
) || '' }}

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
/artifact_cache
/pg_install
/target
/tmp_check

1125
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -40,8 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"lambda/aztraffic",
"lambda/pod_info_dumper",
"object_storage",
]
[workspace.package]
@@ -185,7 +184,7 @@ test-context = "0.3"
thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.41", features = ["macros"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
@@ -210,6 +209,7 @@ tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"
@@ -344,12 +344,3 @@ inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "z"
lto = true
[profile.release-lambda-function]
inherits = "release"
lto = true
opt-level = "z"
codegen-units = 1
panic = "abort"
debug = false
strip = true

View File

@@ -89,6 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -121,6 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -1022,39 +1022,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
#########################################################################################
#
# Layer "pg_embedding-build"
# compile pg_embedding extension
#
#########################################################################################
FROM build-deps AS pg_embedding-src
ARG PG_VERSION
# This is our extension, support stopped in favor of pgvector
# TODO: deprecate it
WORKDIR /ext-src
RUN case "${PG_VERSION:?}" in \
"v14" | "v15") \
export PG_EMBEDDING_VERSION=0.3.5 \
export PG_EMBEDDING_CHECKSUM=0e95b27b8b6196e2cf0a0c9ec143fe2219b82e54c5bb4ee064e76398cbe69ae9 \
;; \
*) \
echo "pg_embedding not supported on this PostgreSQL version. Use pgvector instead." && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/${PG_EMBEDDING_VERSION}.tar.gz -O pg_embedding.tar.gz && \
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xzf ../pg_embedding.tar.gz --strip-components=1 -C .
FROM pg-build AS pg_embedding-build
COPY --from=pg_embedding-src /ext-src/ /ext-src/
WORKDIR /ext-src/
RUN if [ -d pg_embedding-src ]; then \
cd pg_embedding-src && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install; \
fi
#########################################################################################
#
# Layer "pg build with nonroot user and cargo installed"
@@ -1647,7 +1614,6 @@ COPY --from=rdkit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1824,7 +1790,6 @@ COPY --from=pg_cron-src /ext-src/ /ext-src/
COPY --from=pg_uuidv7-src /ext-src/ /ext-src/
COPY --from=pg_roaringbitmap-src /ext-src/ /ext-src/
COPY --from=pg_semver-src /ext-src/ /ext-src/
#COPY --from=pg_embedding-src /ext-src/ /ext-src/
#COPY --from=wal2json-src /ext-src/ /ext-src/
COPY --from=pg_ivm-src /ext-src/ /ext-src/
COPY --from=pg_partman-src /ext-src/ /ext-src/

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index 442e7aff2b..525f732b03 100644
index d785f92561..16377e5ac9 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -8,7 +8,7 @@
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 454db91ec0..01378d7081 100644
index 4cbdbdf84d..573362850e 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,7 @@
@@ -1,8 +1,6 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 6b8c2f2414..8e13b7fa46 100644
index 84745b9f60..4883c12351 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1112,7 +1111,7 @@ index 8475231735..0653946337 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index 5b9dba7b32..cc408dad42 100644
index 620fbe8c52..0570102357 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1174,8 +1173,8 @@ index 5b9dba7b32..cc408dad42 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1192,7 +1191,7 @@ index 5b9dba7b32..cc408dad42 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1201,7 +1200,7 @@ index 5b9dba7b32..cc408dad42 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1212,7 +1211,7 @@ index 5b9dba7b32..cc408dad42 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1221,7 +1220,7 @@ index 5b9dba7b32..cc408dad42 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
@@ -2881,7 +2885,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1230,7 +1229,7 @@ index 5b9dba7b32..cc408dad42 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2918,10 +2922,10 @@ RESET ROLE;
@@ -2925,10 +2929,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1245,7 +1244,7 @@ index 5b9dba7b32..cc408dad42 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2957,9 +2961,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1841,7 +1840,7 @@ index 09a255649b..15895f0c53 100644
CREATE TABLE ruletest_t2 (x int);
CREATE VIEW ruletest_v1 WITH (security_invoker=true) AS
diff --git a/src/test/regress/expected/security_label.out b/src/test/regress/expected/security_label.out
index a8e01a6220..5a9cef4ede 100644
index a8e01a6220..83543b250a 100644
--- a/src/test/regress/expected/security_label.out
+++ b/src/test/regress/expected/security_label.out
@@ -6,8 +6,8 @@ SET client_min_messages TO 'warning';
@@ -1855,34 +1854,6 @@ index a8e01a6220..5a9cef4ede 100644
CREATE TABLE seclabel_tbl1 (a int, b text);
CREATE TABLE seclabel_tbl2 (x int, y text);
CREATE VIEW seclabel_view1 AS SELECT * FROM seclabel_tbl2;
@@ -19,21 +19,21 @@ ALTER TABLE seclabel_tbl2 OWNER TO regress_seclabel_user2;
-- Test of SECURITY LABEL statement without a plugin
--
SECURITY LABEL ON TABLE seclabel_tbl1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON TABLE seclabel_tbl1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON TABLE seclabel_tbl1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON TABLE seclabel_tbl3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
-- clean up objects
DROP FUNCTION seclabel_four();
DROP DOMAIN seclabel_domain;
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
index b79fe9a1c0..e29fab88ab 100644
--- a/src/test/regress/expected/select_into.out
@@ -2413,10 +2384,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index 9a65fca91f..58431a3056 100644
index b567a1a572..4d1ac2e631 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2780,7 +2751,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 0367c0e37a..a23b98c4bd 100644
index 46ad263478..eb05584ed5 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2893,7 +2864,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 45c7a534cb..32dd26b8cd 100644
index 9f4210b26e..620d3fc87e 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3246,7 +3217,7 @@ index 53e86b0b6c..0303fdfe96 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index 249df17a58..b258e7f26a 100644
index 259f1aedd1..6e1a3d17b7 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3308,7 +3279,7 @@ index 249df17a58..b258e7f26a 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3317,7 +3288,7 @@ index 249df17a58..b258e7f26a 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3328,7 +3299,7 @@ index 249df17a58..b258e7f26a 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3337,7 +3308,7 @@ index 249df17a58..b258e7f26a 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
@@ -1839,7 +1839,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3346,7 +3317,7 @@ index 249df17a58..b258e7f26a 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1856,10 +1856,10 @@ RESET ROLE;
@@ -1859,10 +1859,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3361,7 +3332,7 @@ index 249df17a58..b258e7f26a 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1884,9 +1884,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index 442e7aff2b..525f732b03 100644
index d785f92561..16377e5ac9 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -8,7 +8,7 @@
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 454db91ec0..01378d7081 100644
index 4cbdbdf84d..573362850e 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,7 @@
@@ -1,8 +1,6 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 69994c98e3..129abcfbe8 100644
index fe6a1015f2..614b387b7d 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1147,7 +1146,7 @@ index 924d6e001d..7fdda73439 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index 1296da0d57..f43fffa44c 100644
index e8c668e0a1..03be5c2120 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1209,8 +1208,8 @@ index 1296da0d57..f43fffa44c 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1227,7 +1226,7 @@ index 1296da0d57..f43fffa44c 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1236,7 +1235,7 @@ index 1296da0d57..f43fffa44c 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1247,7 +1246,7 @@ index 1296da0d57..f43fffa44c 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1256,7 +1255,7 @@ index 1296da0d57..f43fffa44c 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2888,7 +2892,7 @@ DROP USER regress_locktable_user;
@@ -2895,7 +2899,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1265,7 +1264,7 @@ index 1296da0d57..f43fffa44c 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2932,10 +2936,10 @@ RESET ROLE;
@@ -2939,10 +2943,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1280,7 +1279,7 @@ index 1296da0d57..f43fffa44c 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2964,9 +2968,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2971,9 +2975,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1293,7 +1292,7 @@ index 1296da0d57..f43fffa44c 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -2995,9 +2999,9 @@ DROP ROLE regress_roleoption_protagonist;
@@ -3002,9 +3006,9 @@ DROP ROLE regress_roleoption_protagonist;
DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN
@@ -2433,10 +2432,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index 9a65fca91f..58431a3056 100644
index b567a1a572..4d1ac2e631 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2800,7 +2799,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 0367c0e37a..a23b98c4bd 100644
index 46ad263478..eb05584ed5 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2913,7 +2912,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 2e710e419c..89cd481a54 100644
index 8c4e4c7c83..e946cd2119 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3301,7 +3300,7 @@ index bb82aa4aa2..dd8a05e24d 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index 5880bc018d..27aa952b18 100644
index b7e1cb6cdd..6e5a2217f1 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3363,7 +3362,7 @@ index 5880bc018d..27aa952b18 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3372,7 +3371,7 @@ index 5880bc018d..27aa952b18 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3383,7 +3382,7 @@ index 5880bc018d..27aa952b18 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3392,7 +3391,7 @@ index 5880bc018d..27aa952b18 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1851,7 +1851,7 @@ DROP USER regress_locktable_user;
@@ -1854,7 +1854,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3401,7 +3400,7 @@ index 5880bc018d..27aa952b18 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1871,10 +1871,10 @@ RESET ROLE;
@@ -1874,10 +1874,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3416,7 +3415,7 @@ index 5880bc018d..27aa952b18 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1896,9 +1896,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1899,9 +1899,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -3429,7 +3428,7 @@ index 5880bc018d..27aa952b18 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -1926,9 +1926,9 @@ DROP ROLE regress_roleoption_donor;
@@ -1929,9 +1929,9 @@ DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN

View File

@@ -118,16 +118,18 @@ struct Cli {
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 's', long = "spec", group = "spec")]
pub spec_json: Option<String>,
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]
pub compute_id: String,
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
#[arg(
short = 'p',
long,
conflicts_with = "spec-path",
value_name = "CONTROL_PLANE_API_BASE_URL"
)]
pub control_plane_uri: Option<String>,
}
@@ -172,7 +174,6 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
cli_spec.compute_ctl_config,
@@ -201,23 +202,12 @@ async fn init() -> Result<()> {
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
// First, try to get cluster spec from the cli argument
if let Some(ref spec_json) = cli.spec_json {
info!("got spec from cli argument {}", spec_json);
return Ok(CliSpecParams {
spec: Some(serde_json::from_str(spec_json)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: false,
});
}
// Second, try to read it from the file if path is provided
// First, read spec from the path if provided
if let Some(ref spec_path) = cli.spec_path {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: true,
});
}
@@ -225,11 +215,12 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
panic!("must specify --control-plane-uri");
};
// If the spec wasn't provided in the CLI arguments, then retrieve it from
// the control plane
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(resp) => Ok(CliSpecParams {
spec: resp.0,
compute_ctl_config: resp.1,
live_config_allowed: true,
}),
Err(e) => {
error!(
@@ -247,7 +238,6 @@ struct CliSpecParams {
spec: Option<ComputeSpec>,
#[allow(dead_code)]
compute_ctl_config: ComputeCtlConfig,
live_config_allowed: bool,
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {

View File

@@ -98,13 +98,15 @@ pub async fn get_database_schema(
.kill_on_drop(true)
.spawn()?;
let stdout = cmd.stdout.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stdout.")
})?;
let stdout = cmd
.stdout
.take()
.ok_or_else(|| std::io::Error::other("Failed to capture stdout."))?;
let stderr = cmd.stderr.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stderr.")
})?;
let stderr = cmd
.stderr
.take()
.ok_or_else(|| std::io::Error::other("Failed to capture stderr."))?;
let mut stdout_reader = FramedRead::new(stdout, BytesCodec::new());
let stderr_reader = BufReader::new(stderr);
@@ -128,8 +130,7 @@ pub async fn get_database_schema(
}
});
return Err(SchemaDumpError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
return Err(SchemaDumpError::IO(std::io::Error::other(
"failed to start pg_dump",
)));
}

View File

@@ -93,20 +93,6 @@ pub struct ComputeNodeParams {
/// the address of extension storage proxy gateway
pub ext_remote_storage: Option<String>,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
/// - we start compute with some spec provided as argument
/// - we push new spec and it does reconfiguration
/// - but then something happens and compute pod / VM is destroyed,
/// so k8s controller starts it again with the **old** spec
///
/// and the same for empty computes:
/// - we started compute without any spec
/// - we push spec and it does configuration
/// - but then it is restarted without any spec again
pub live_config_allowed: bool,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -661,15 +647,8 @@ impl ComputeNode {
}
// Configure and start rsyslog for Postgres logs export
if self.has_feature(ComputeFeature::PostgresLogsExport) {
if let Some(ref project_id) = pspec.spec.cluster.cluster_id {
let host = PostgresLogsRsyslogConfig::default_host(project_id);
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
configure_postgres_logs_export(conf)?;
} else {
warn!("not configuring rsyslog for Postgres logs export: project ID is missing")
}
}
let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Launch remaining service threads
let _monitor_handle = launch_monitor(self);
@@ -1573,6 +1552,10 @@ impl ComputeNode {
});
}
// Reconfigure rsyslog for Postgres logs export
let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(

View File

@@ -7,7 +7,7 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, GenericOption};
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
@@ -255,7 +255,7 @@ pub fn write_postgres_conf(
// We need Postgres to send logs to rsyslog so that we can forward them
// further to customers' log aggregation systems.
if spec.features.contains(&ComputeFeature::PostgresLogsExport) {
if spec.logs_export_host.is_some() {
writeln!(file, "log_destination='stderr,syslog'")?;
}

View File

@@ -6,20 +6,15 @@ use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use compute_api::requests::ComputeClaims;
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
use serde::Deserialize;
use tower_http::auth::AsyncAuthorizeRequest;
use tracing::warn;
use crate::http::{JsonResponse, extract::RequestId};
#[derive(Clone, Debug, Deserialize)]
pub(in crate::http) struct Claims {
compute_id: String,
}
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
@@ -112,7 +107,11 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
impl Authorize {
/// Verify the token using the JSON Web Key set and return the token data.
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
fn verify(
jwks: &JwkSet,
token: &str,
validation: &Validation,
) -> Result<TokenData<ComputeClaims>> {
for jwk in jwks.keys.iter() {
let decoding_key = match DecodingKey::from_jwk(jwk) {
Ok(key) => key,
@@ -127,7 +126,7 @@ impl Authorize {
}
};
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
match jsonwebtoken::decode::<ComputeClaims>(token, &decoding_key, validation) {
Ok(data) => return Ok(data),
Err(e) => {
warn!(

View File

@@ -306,36 +306,6 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
/configure_telemetry:
post:
tags:
- Configure
summary: Configure rsyslog
description: |
This API endpoint configures rsyslog to forward Postgres logs
to a specified otel collector.
operationId: configureTelemetry
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
logs_export_host:
type: string
description: |
Hostname and the port of the otel collector. Leave empty to disable logs forwarding.
Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:54526
responses:
204:
description: "Telemetry configured successfully"
500:
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:

View File

@@ -1,11 +1,9 @@
use std::sync::Arc;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::ComputeFeature;
use http::StatusCode;
use tokio::task;
use tracing::info;
@@ -13,7 +11,6 @@ use tracing::info;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::JsonResponse;
use crate::http::extract::Json;
use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
// Accept spec in JSON format and request compute configuration. If anything
// goes wrong after we set the compute status to `ConfigurationPending` and
@@ -25,13 +22,6 @@ pub(in crate::http) async fn configure(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigurationRequest>,
) -> Response {
if !compute.params.live_config_allowed {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"live configuration is not allowed for this compute node".to_string(),
);
}
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
@@ -95,25 +85,3 @@ pub(in crate::http) async fn configure(
JsonResponse::success(StatusCode::OK, body)
}
pub(in crate::http) async fn configure_telemetry(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigureTelemetryRequest>,
) -> Response {
if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"Postgres logs export feature is not enabled".to_string(),
);
}
let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
if let Err(err) = configure_postgres_logs_export(conf) {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::from(""))
.unwrap()
}

View File

@@ -87,7 +87,6 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/configure_telemetry", post(configure::configure_telemetry))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))

View File

@@ -119,16 +119,9 @@ impl<'a> PostgresLogsRsyslogConfig<'a> {
};
Ok(config_content)
}
/// Returns the default host for otel collector that receives Postgres logs
pub fn default_host(project_id: &str) -> String {
format!(
"config-{}-collector.neon-telemetry.svc.cluster.local:10514",
project_id
)
}
}
/// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
let new_config = conf.build()?;
let current_config = PostgresLogsRsyslogConfig::current_config()?;
@@ -261,16 +254,5 @@ mod tests {
let res = conf.build();
assert!(res.is_err());
}
{
// Verify config with default host
let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
let res = conf.build();
assert!(res.is_ok());
let conf_str = res.unwrap();
assert!(conf_str.contains(r#"shy-breeze-123"#));
assert!(conf_str.contains(r#"port="10514""#));
}
}
}

View File

@@ -20,8 +20,10 @@ use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::{
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
SafekeeperConf,
ObjectStorageConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -39,7 +41,7 @@ use pageserver_api::controller_api::{
use pageserver_api::models::{
ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
@@ -91,6 +93,8 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
Mappings(MappingsCmd),
@@ -454,6 +458,32 @@ enum SafekeeperCmd {
Restart(SafekeeperRestartCmdArgs),
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
#[derive(clap::Args)]
#[clap(about = "Start local safekeeper")]
struct SafekeeperStartCmdArgs {
@@ -759,6 +789,7 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -975,6 +1006,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
@@ -1083,7 +1117,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
stripe_size: args
.shard_stripe_size
.map(ShardStripeSize)
.unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
.unwrap_or(DEFAULT_STRIPE_SIZE),
},
placement_policy: args.placement_policy.clone(),
config: tenant_conf,
@@ -1396,7 +1430,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
@@ -1683,6 +1717,41 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
storage.bin
);
return Ok(());
}
match subcmd {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
};
if let Err(e) = storage.stop(immediate) {
eprintln!("proxy stop failed: {e}");
exit(1);
}
}
};
Ok(())
}
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
@@ -1777,6 +1846,13 @@ async fn handle_start_all_impl(
.map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
});
}
js.spawn(async move {
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start object_storage"))
});
})();
let mut errors = Vec::new();
@@ -1874,6 +1950,11 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.stop(immediate) {

View File

@@ -670,6 +670,7 @@ impl Endpoint {
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
};
// this strange code is needed to support respec() in tests

View File

@@ -10,6 +10,7 @@ mod background_process;
pub mod broker;
pub mod endpoint;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -15,9 +15,10 @@ use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use utils::auth::{Claims, encode_from_key_file};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -55,6 +56,7 @@ pub struct LocalEnv {
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
@@ -68,6 +70,8 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Url,
@@ -95,6 +99,7 @@ pub struct OnDiskConfig {
pub neon_distrib_dir: PathBuf,
pub default_tenant_id: Option<TenantId>,
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
pub storage_controller: NeonStorageControllerConf,
#[serde(
@@ -103,6 +108,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -136,11 +142,18 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct ObjectStorageConf {
pub port: u16,
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
@@ -398,6 +411,10 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("pageserver")
}
@@ -431,6 +448,10 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
Ok(conf)
@@ -582,6 +603,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -591,6 +613,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -598,6 +621,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -606,6 +630,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
}
};
@@ -705,6 +730,7 @@ impl LocalEnv {
neon_distrib_dir: self.neon_distrib_dir.clone(),
default_tenant_id: self.default_tenant_id,
private_key_path: self.private_key_path.clone(),
public_key_path: self.public_key_path.clone(),
broker: self.broker.clone(),
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
@@ -714,6 +740,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
object_storage: self.object_storage.clone(),
},
)
}
@@ -730,7 +757,7 @@ impl LocalEnv {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
let private_key_path = self.get_private_key_path();
let key_data = fs::read(private_key_path)?;
encode_from_key_file(claims, &key_data)
@@ -797,6 +824,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
object_storage,
} = conf;
// Find postgres binaries.
@@ -828,6 +856,7 @@ impl LocalEnv {
)
.context("generate auth keys")?;
let private_key_path = PathBuf::from("auth_private_key.pem");
let public_key_path = PathBuf::from("auth_public_key.pem");
// create the runtime type because the remaining initialization code below needs
// a LocalEnv instance op operation
@@ -838,6 +867,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id: Some(default_tenant_id),
private_key_path,
public_key_path,
broker,
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
@@ -846,6 +876,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
object_storage,
};
if generate_local_ssl_certs {
@@ -873,8 +904,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -0,0 +1,107 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("127.0.0.1:{}", self.port).into()
}
pub fn init(&self) -> Result<()> {
println!("Initializing object storage in {:?}", self.data_dir);
let parent = self.data_dir.parent().unwrap();
#[derive(serde::Serialize)]
struct Cfg {
listen: Utf8PathBuf,
pemfile: Utf8PathBuf,
local_path: Utf8PathBuf,
r#type: String,
}
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
.context("write object storage config")?;
Ok(())
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
vec![("RUST_LOG".into(), "debug".into())],
background_process::InitialPidFile::Create(self.pid_file()),
retry_timeout,
process_status_check,
)
.await;
if res.is_err() {
eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
}
res
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -318,7 +318,7 @@ impl PageServerNode {
self.conf.id, datadir,
)
})?;
let args = vec!["-D", datadir_path_str];
let args = vec!["-D", datadir_path_str, "--dev"];
background_process::start_process(
"pageserver",

View File

@@ -162,6 +162,7 @@ impl SafekeeperNode {
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"--dev".to_owned(),
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("{}:{}", self.listen_addr, pg_tenant_only_port);

View File

@@ -941,7 +941,7 @@ async fn main() -> anyhow::Result<()> {
let mut node_to_fill_descs = Vec::new();
for desc in node_descs {
let to_drain = nodes.iter().any(|id| *id == desc.id);
let to_drain = nodes.contains(&desc.id);
if to_drain {
node_to_drain_descs.push(desc);
} else {

View File

@@ -151,7 +151,7 @@ Example body:
```
{
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
"stripe_size": 32768,
"stripe_size": 2048,
"shards": [
{"node_id": 344, "shard_number": 0},
{"node_id": 722, "shard_number": 1},

View File

@@ -1,22 +0,0 @@
[package]
name = "aztraffic"
version = "0.0.0"
edition.workspace = true
license.workspace = true
publish = false
[dependencies]
anyhow = "1.0.97"
aws-config = "1.6.1"
aws-sdk-athena = "1.68.0"
aws-sdk-ec2 = "1.121.0"
aws-sdk-eks = "1.82.0"
aws-sdk-glue = "1.88.0"
aws-sdk-lambda = "1.75.0"
aws-sdk-scheduler = "1.64.0"
aws-sdk-sfn = "1.68.0"
aws-sdk-sts = "1.65.0"
clap = { version = "4.5.35", features = ["derive", "env"] }
tokio = { version = "1.44.1", features = ["full"] }
serde = "1.0.219"
serde_json = { version = "1.0.140", features = ["preserve_order"] }

View File

@@ -1,794 +0,0 @@
use std::fs;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_sdk_ec2::types::{
DestinationFileFormat, DestinationOptionsRequest, FlowLogsResourceType, LogDestinationType,
TrafficType,
};
use aws_sdk_glue::primitives::Blob;
use aws_sdk_glue::types::{Column, DatabaseInput, SerDeInfo, StorageDescriptor, TableInput};
use aws_sdk_lambda::types::{Environment, FunctionCode, Runtime};
use aws_sdk_scheduler::types::{
DeadLetterConfig, FlexibleTimeWindow, FlexibleTimeWindowMode, RetryPolicy, Target,
};
use aws_sdk_sfn::types::{CloudWatchLogsLogGroup, LogDestination, LogLevel, LoggingConfiguration};
use clap::Parser;
use serde_json::json;
#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(long, value_name = "id")]
account_id: String,
#[arg(long, value_name = "region")]
region: String,
#[arg(long, value_name = "cluster")]
cluster: String,
#[arg(long, value_name = "id")]
vpc_id: Vec<String>,
#[arg(long, value_name = "arn")]
log_group_arn: String,
#[arg(long, value_name = "name")]
pod_info_s3_bucket_name: String,
#[arg(
long,
value_name = "path",
default_value = "CrossAZTraffic/pod_info_dumper/pod_info.csv"
)]
pod_info_s3_bucket_key: String,
#[arg(long, value_name = "uri")]
pod_info_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
vpc_flow_logs_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
results_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "./target/lambda/pod_info_dumper/bootstrap.zip"
)]
lambda_zipfile_path: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-function"
)]
lambda_function_name: String,
#[arg(long, value_name = "arn")]
lambda_role_arn: String,
#[arg(long, value_name = "name")]
glue_database_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-table"
)]
glue_pod_info_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-vpcflowlogs-table"
)]
glue_vpc_flow_logs_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-results-table"
)]
glue_results_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-trigger-schedule"
)]
schedule_name: String,
#[arg(long, value_name = "minutes", default_value_t = 60)]
schedule_interval_minutes: usize,
#[arg(long, value_name = "arn")]
schedule_target_state_machine_arn: String,
#[arg(long, value_name = "arn")]
schedule_target_role_arn: String,
#[arg(long, value_name = "arn")]
schedule_dead_letter_queue_arn: Option<String>,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-combine-query"
)]
athena_query_name: String,
#[arg(long, value_name = "uri")]
vpcflowlogs_destination_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-statemachine"
)]
statemachine_name: String,
#[arg(long, value_name = "arn")]
statemachine_role_arn: String,
#[arg(long, value_name = "uri")]
athena_results_s3_bucket_uri: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
eprintln!("{args:#?}");
// TODO: athena results bucket + lifecycle config
// TODO: iam role split
// TODO: iam policy
// TODO: clusterrole + binding
// TODO: eks mapping
// TODO: log group
// TODO: dlq
let sdk_config = create_sdk_config(&args).await?;
LambdaFunction {
local_zipfile_path: args.lambda_zipfile_path,
function_name: args.lambda_function_name.clone(),
role_arn: args.lambda_role_arn,
account_id: args.account_id,
region: args.region,
cluster: args.cluster,
s3_bucket_name: args.pod_info_s3_bucket_name,
s3_bucket_key: args.pod_info_s3_bucket_key,
}
.create(&sdk_config)
.await?;
GlueDatabase {
database_name: args.glue_database_name.clone(),
pod_info_table_name: args.glue_pod_info_table_name.clone(),
pod_info_s3_bucket_uri: args.pod_info_s3_bucket_uri,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name.clone(),
vpc_flow_logs_s3_bucket_uri: args.vpc_flow_logs_s3_bucket_uri,
results_table_name: args.glue_results_table_name.clone(),
results_s3_bucket_uri: args.results_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
let named_query_id = AthenaQuery {
query_name: args.athena_query_name,
glue_database: args.glue_database_name.clone(),
invocation_frequency: args.schedule_interval_minutes,
athena_results_table_name: args.glue_results_table_name,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name,
pod_info_table_name: args.glue_pod_info_table_name,
}
.create(&sdk_config)
.await?;
StateMachine {
name: args.statemachine_name,
role_arn: args.statemachine_role_arn,
named_query_id,
glue_database: args.glue_database_name,
lambda_function_name: args.lambda_function_name,
athena_results_s3_bucket_uri: args.athena_results_s3_bucket_uri,
log_group_arn: args.log_group_arn,
}
.create(&sdk_config)
.await?;
Schedule {
name: args.schedule_name,
interval_minutes: args.schedule_interval_minutes,
dead_letter_queue_arn: args.schedule_dead_letter_queue_arn,
target_role_arn: args.schedule_target_role_arn,
target_state_machine_arn: args.schedule_target_state_machine_arn,
}
.create(&sdk_config)
.await?;
let flow_log_ids = VpcFlowLogs {
vpc_ids: args.vpc_id,
destination_s3_bucket_uri: args.vpcflowlogs_destination_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
println!("VPC flow log IDs: {:?}", flow_log_ids.as_slice());
Ok(())
}
async fn create_sdk_config(args: &Args) -> anyhow::Result<aws_config::SdkConfig> {
let region = aws_config::Region::new(args.region.to_owned());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
struct LambdaFunction {
local_zipfile_path: String,
function_name: String,
role_arn: String,
account_id: String,
region: String,
cluster: String,
s3_bucket_name: String,
s3_bucket_key: String,
}
impl LambdaFunction {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let code = fs::read(&self.local_zipfile_path)?;
let client = aws_sdk_lambda::Client::new(sdk_config);
client
.delete_function()
.function_name(&self.function_name)
.send()
.await
.ok();
client
.create_function()
.function_name(&self.function_name)
.runtime(Runtime::Providedal2023)
.handler("bootstrap")
.role(&self.role_arn)
.code(FunctionCode::builder().zip_file(Blob::new(code)).build())
.timeout(60)
.environment(
Environment::builder()
.set_variables(Some(
[
("NEON_ACCOUNT_ID", self.account_id.as_str()),
("NEON_REGION", self.region.as_str()),
("NEON_CLUSTER", self.cluster.as_str()),
("NEON_S3_BUCKET_NAME", self.s3_bucket_name.as_str()),
("NEON_S3_BUCKET_KEY", self.s3_bucket_key.as_str()),
("AWS_LAMBDA_LOG_FORMAT", "JSON"),
("AWS_LAMBDA_LOG_LEVEL", "INFO"),
]
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
))
.build(),
)
.send()
.await?;
Ok(())
}
}
struct VpcFlowLogs {
vpc_ids: Vec<String>,
destination_s3_bucket_uri: String,
}
impl VpcFlowLogs {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<Vec<String>> {
let ec2_client = aws_sdk_ec2::Client::new(sdk_config);
let flow_logs = ec2_client
.create_flow_logs()
.resource_type(FlowLogsResourceType::Vpc)
.set_resource_ids(Some(self.vpc_ids.clone()))
.traffic_type(TrafficType::All)
.log_destination_type(LogDestinationType::S3)
.log_destination(&self.destination_s3_bucket_uri)
.destination_options(
DestinationOptionsRequest::builder()
.file_format(DestinationFileFormat::Parquet)
.hive_compatible_partitions(false)
.per_hour_partition(true)
.build(),
)
.log_format("${region} ${az-id} ${vpc-id} ${flow-direction} ${pkt-srcaddr} ${pkt-dstaddr} ${srcport} ${dstport} ${start} ${bytes}")
.send()
.await?;
if let Some(unsuccessful) = flow_logs
.unsuccessful
.as_ref()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
{
anyhow::bail!("VPC flow log creation unsuccessful: {unsuccessful:?}");
}
Ok(flow_logs.flow_log_ids().iter().cloned().collect())
}
}
struct GlueDatabase {
database_name: String,
pod_info_table_name: String,
pod_info_s3_bucket_uri: String,
vpc_flow_logs_table_name: String,
vpc_flow_logs_s3_bucket_uri: String,
results_table_name: String,
results_s3_bucket_uri: String,
}
impl GlueDatabase {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let glue_client = aws_sdk_glue::Client::new(sdk_config);
let db = DatabaseInput::builder().name(&self.database_name).build()?;
glue_client
.create_database()
.database_input(db.clone())
.send()
.await?;
let pod_info_columns = &[
Column::builder()
.name("namespace")
.r#type("string")
.build()?,
Column::builder().name("name").r#type("string").build()?,
Column::builder().name("ip").r#type("string").build()?,
Column::builder()
.name("creation_time")
.r#type("timestamp")
.build()?,
Column::builder().name("node").r#type("string").build()?,
Column::builder().name("az").r#type("string").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.pod_info_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.pod_info_s3_bucket_uri)
.compressed(false)
.set_columns(Some(pod_info_columns.into_iter().cloned().collect()))
.input_format("org.apache.hadoop.mapred.TextInputFormat")
.output_format(
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.serde2.OpenCSVSerde",
)
.parameters("separatorChar", ",")
.parameters("quoteChar", "`")
.parameters("escapeChar", r"\")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "csv")
.parameters("skip.header.line.count", "1")
.retention(0)
.build()?,
)
.send()
.await?;
let vpc_flow_logs_columns = &[
Column::builder().name("region").r#type("string").build()?,
Column::builder().name("az_id").r#type("string").build()?,
Column::builder().name("vpc_id").r#type("string").build()?,
Column::builder()
.name("flow_direction")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_srcaddr")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_dstaddr")
.r#type("string")
.build()?,
Column::builder().name("srcport").r#type("int").build()?,
Column::builder().name("dstport").r#type("int").build()?,
Column::builder().name("start").r#type("bigint").build()?,
Column::builder().name("bytes").r#type("bigint").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.vpc_flow_logs_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.vpc_flow_logs_s3_bucket_uri)
.compressed(false)
.set_columns(Some(vpc_flow_logs_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
let athena_results_columns = &[
Column::builder().name("time").r#type("timestamp").build()?,
Column::builder().name("traffic").r#type("string").build()?,
Column::builder()
.name("total_bytes")
.r#type("bigint")
.build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.results_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.results_s3_bucket_uri)
.compressed(false)
.set_columns(Some(athena_results_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct AthenaQuery {
query_name: String,
glue_database: String,
invocation_frequency: usize,
athena_results_table_name: String,
vpc_flow_logs_table_name: String,
pod_info_table_name: String,
}
impl AthenaQuery {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<String> {
let Self {
athena_results_table_name,
vpc_flow_logs_table_name,
pod_info_table_name,
invocation_frequency,
..
} = self;
let query_string = format!(
r#"
INSERT INTO "{athena_results_table_name}"
WITH
ip_addresses_and_az_mapping AS (
SELECT
DISTINCT pkt_srcaddr AS ipaddress,
az_id
FROM "{vpc_flow_logs_table_name}"
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
egress_flows_of_pods_with_status AS (
SELECT
"{pod_info_table_name}".name AS srcpodname,
pkt_srcaddr AS srcaddr,
pkt_dstaddr AS dstaddr,
"{vpc_flow_logs_table_name}".az_id AS srcazid,
bytes,
start
FROM "{vpc_flow_logs_table_name}"
INNER JOIN "{pod_info_table_name}" ON "{vpc_flow_logs_table_name}".pkt_srcaddr = "{pod_info_table_name}".ip
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
cross_az_traffic_by_pod AS (
SELECT
srcaddr,
srcpodname,
dstaddr,
"{pod_info_table_name}".name AS dstpodname,
srcazid,
ip_addresses_and_az_mapping.az_id AS dstazid,
bytes,
start
FROM egress_flows_of_pods_with_status
INNER JOIN "{pod_info_table_name}" ON dstaddr = "{pod_info_table_name}".ip
LEFT JOIN ip_addresses_and_az_mapping ON dstaddr = ipaddress
WHERE ip_addresses_and_az_mapping.az_id != srcazid
)
SELECT
date_trunc('MINUTE', from_unixtime(start)) AS time,
CONCAT(srcpodname, ' -> ', dstpodname) AS traffic,
SUM(bytes) AS total_bytes
FROM cross_az_traffic_by_pod
GROUP BY date_trunc('MINUTE', from_unixtime(start)), CONCAT(srcpodname, ' -> ', dstpodname)
ORDER BY time, total_bytes DESC
"#
);
let athena_client = aws_sdk_athena::Client::new(sdk_config);
let res = athena_client
.create_named_query()
.name(&self.query_name)
.database(&self.glue_database)
.query_string(query_string)
.send()
.await?;
Ok(res.named_query_id.unwrap())
}
}
struct StateMachine {
name: String,
role_arn: String,
named_query_id: String,
glue_database: String,
lambda_function_name: String,
athena_results_s3_bucket_uri: String,
log_group_arn: String,
}
impl StateMachine {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sfn_client = aws_sdk_sfn::Client::new(sdk_config);
sfn_client
.create_state_machine()
.name(&self.name)
.role_arn(&self.role_arn)
.logging_configuration(
LoggingConfiguration::builder()
.level(LogLevel::All)
.destinations(
LogDestination::builder()
.cloud_watch_logs_log_group(
CloudWatchLogsLogGroup::builder()
.log_group_arn(&self.log_group_arn)
.build(),
)
.build(),
)
.build(),
)
.definition(
json!(
{
"StartAt": "Invoke",
"States": {
"Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Output": "{% $states.result.Payload %}",
"Arguments": {
"FunctionName": self.lambda_function_name,
"Payload": json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
}).to_string()
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "Check"
},
"Check": {
"Type": "Choice",
"Choices": [
{
"Next": "GetNamedQuery",
"Condition": "{% $states.input.statusCode = 200 %}"
}
],
"Default": "Fail"
},
"GetNamedQuery": {
"Type": "Task",
"Arguments": {
"NamedQueryId": self.named_query_id
},
"Resource": "arn:aws:states:::aws-sdk:athena:getNamedQuery",
"Output": {
"QueryString": "{% $states.result.NamedQuery.QueryString %}"
},
"Next": "StartQueryExecution"
},
"StartQueryExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Arguments": {
"QueryString": "{% $states.input.QueryString %}",
"QueryExecutionContext": {
"Database": self.glue_database
},
"ResultConfiguration": {
"OutputLocation": self.athena_results_s3_bucket_uri
},
"WorkGroup": "primary"
},
"End": true
},
"Fail": {
"Type": "Fail"
}
},
"QueryLanguage": "JSONata"
}
)
.to_string(),
)
.send()
.await?;
Ok(())
}
}
struct Schedule {
name: String,
interval_minutes: usize,
target_state_machine_arn: String,
target_role_arn: String,
dead_letter_queue_arn: Option<String>,
}
impl Schedule {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sched_client = aws_sdk_scheduler::Client::new(sdk_config);
sched_client
.create_schedule()
.name(&self.name)
.schedule_expression(format!("rate({} minute)", self.interval_minutes))
.flexible_time_window(
FlexibleTimeWindow::builder()
.mode(FlexibleTimeWindowMode::Off)
.build()?,
)
.target(
Target::builder()
.arn(&self.target_state_machine_arn)
.role_arn(&self.target_role_arn)
.input(
json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
})
.to_string(),
)
.retry_policy(
RetryPolicy::builder()
.maximum_retry_attempts(0)
.maximum_event_age_in_seconds(60)
.build(),
)
.set_dead_letter_config(
self.dead_letter_queue_arn
.as_ref()
.map(|arn| DeadLetterConfig::builder().arn(arn).build()),
)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct KubernetesRoles {
region: String,
cluster: String,
k8s_role_prefix: String,
lambda_role_arn: String,
}
impl KubernetesRoles {
fn print(&self) -> anyhow::Result<()> {
let Self {
region,
cluster,
k8s_role_prefix,
lambda_role_arn,
} = self;
let yaml = format!(
r#"
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {k8s_role_prefix}-clusterrole
rules:
- apiGroups:
- ""
resources: ["nodes", "namespaces", "pods"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {k8s_role_prefix}-binding
subjects:
- kind: Group
name: {k8s_role_prefix}-group
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: {k8s_role_prefix}-clusterrole
apiGroup: rbac.authorization.k8s.io
"#
);
let eksctl = format!(
r#"eksctl create iamidentitymapping \
--region "{region}"
--cluster "{cluster}" \
--arn "{lambda_role_arn}" \
--username "{k8s_role_prefix}-binding" \
--group "{k8s_role_prefix}-group"
"#
);
Ok(())
}
}

View File

@@ -1,27 +0,0 @@
[package]
name = "pod_info_dumper"
version = "0.0.0"
edition = "2024"
publish = false
[dependencies]
aws_lambda_events = { version = "0.16.0", default-features = false, features = ["eventbridge"] }
aws-config = { workspace = true }
aws-sdk-eks = "1.75.0"
aws-sdk-s3 = { workspace = true }
aws-sdk-sts = "1.65.0"
aws-sigv4 = "1.3.0"
base64 = { version = "0.22.1" }
csv = { version = "1.3.1", default-features = false }
http = { workspace = true }
k8s-openapi = { version = "0.24.0", default-features = false, features = ["v1_31"] }
kube = { version = "0.99.0", default-features = false, features = ["client", "rustls-tls"] }
lambda_runtime = { version = "0.13.0", default-features = false, features = ["tracing"] }
rustls = { version = "0.23.25" }
rustls-pemfile = { workspace = true }
secrecy = "0.10.3"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true, features = ["asm"] }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true, features = ["max_level_debug", "release_max_level_info"] }

View File

@@ -1,8 +0,0 @@
# pod_info_dumper
An event-triggered AWS lambda function that writes the list of all pods with
node information to a CSV file in S3.
```shell
cargo lambda build -p pod_info_dumper --output-format Zip --x86-64 --profile release-lambda-function
```

View File

@@ -1,420 +0,0 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::{env, io};
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::retry::RetryConfig;
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
use aws_sdk_s3::types::ChecksumAlgorithm;
use aws_sdk_sts::config::ProvideCredentials;
use aws_sigv4::http_request::{
SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
};
use aws_sigv4::sign::v4;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use base64::prelude::*;
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::chrono::SecondsFormat;
use kube::api::{Api, ListParams, ResourceExt};
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
use secrecy::SecretString;
use serde::ser::SerializeMap;
use sha2::{Digest as _, Sha256};
const AZ_LABEL: &str = "topology.kubernetes.io/zone";
#[derive(Debug)]
struct Config {
aws_account_id: String,
s3_bucket: S3BucketConfig,
eks_cluster: EksClusterConfig,
}
#[derive(Debug)]
struct S3BucketConfig {
region: String,
name: String,
key: String,
}
impl S3BucketConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[derive(Debug)]
struct EksClusterConfig {
region: String,
name: String,
}
impl EksClusterConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[tokio::main]
pub async fn start() -> Result<(), Error> {
tracing::init_default_subscriber();
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
tracing::info!("function handler started");
let config = Config {
aws_account_id: env::var("NEON_ACCOUNT_ID")?,
s3_bucket: S3BucketConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_S3_BUCKET_NAME")?,
key: env::var("NEON_S3_BUCKET_KEY")?,
},
eks_cluster: EksClusterConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_CLUSTER")?,
},
};
run(service_fn(async |event: LambdaEvent<EventBridgeEvent<serde_json::Value>>| -> Result<StatusResponse, Error> {
function_handler(event, &config).await
}))
.await
}
#[derive(Debug, PartialEq)]
struct StatusResponse {
status_code: http::StatusCode,
body: Cow<'static, str>,
}
impl StatusResponse {
fn ok() -> Self {
StatusResponse {
status_code: http::StatusCode::OK,
body: "OK".into(),
}
}
}
impl serde::Serialize for StatusResponse {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("statusCode", &self.status_code.as_u16())?;
serializer.serialize_entry("body", &self.body)?;
serializer.end()
}
}
#[tracing::instrument(skip_all, fields(?event), err)]
async fn function_handler(
event: LambdaEvent<EventBridgeEvent<serde_json::Value>>,
config: &Config,
) -> Result<StatusResponse, Error> {
tracing::info!("function handler called");
let kube_client = connect_to_cluster(config).await?;
let s3_client = connect_to_s3(config).await?;
let nodes_azs = get_nodes_azs(kube_client.clone()).await?;
let mut pods_info = get_current_pods(kube_client.clone(), &nodes_azs).await?;
pods_info.sort_unstable();
let mut csv = Vec::with_capacity(64 * 1024);
write_csv(&pods_info, &mut csv)?;
tracing::info!(
"csv is {} bytes, containing {} pods",
csv.len(),
pods_info.len()
);
upload_csv(config, &s3_client, &csv).await?;
tracing::info!("pod info successfully stored");
Ok(StatusResponse::ok())
}
#[derive(Debug, serde::Serialize, PartialEq, Eq, PartialOrd, Ord)]
struct PodInfo<'a> {
namespace: String,
name: String,
ip: String,
creation_time: String,
node: String,
az: Option<&'a str>,
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_cluster(config: &Config) -> Result<kube::Client, Error> {
let sdk_config = config.eks_cluster.create_sdk_config().await?;
let eks_client = aws_sdk_eks::Client::new(&sdk_config);
let resp = eks_client
.describe_cluster()
.name(&config.eks_cluster.name)
.send()
.await?;
let cluster = resp
.cluster()
.ok_or_else(|| format!("cluster not found: {}", config.eks_cluster.name))?;
let endpoint = cluster.endpoint().ok_or("cluster endpoint not found")?;
let ca_data = cluster
.certificate_authority()
.and_then(|ca| ca.data())
.ok_or("cluster certificate data not found")?;
let mut k8s_config = kube::Config::new(endpoint.parse()?);
let cert_bytes = STANDARD.decode(ca_data)?;
let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice())
.map(|c| c.map(|c| c.to_vec()))
.collect::<Result<_, _>>()?;
k8s_config.root_cert = Some(certs);
k8s_config.auth_info.token = Some(
create_kube_auth_token(
&sdk_config,
&config.eks_cluster.name,
Duration::from_secs(10 * 60),
)
.await?,
);
tracing::info!("cluster description completed");
Ok(kube::Client::try_from(k8s_config)?)
}
#[tracing::instrument(skip_all, err)]
async fn create_kube_auth_token(
sdk_config: &aws_config::SdkConfig,
cluster_name: &str,
expires_in: Duration,
) -> Result<SecretString, Error> {
let identity = sdk_config
.credentials_provider()
.unwrap()
.provide_credentials()
.await?
.into();
let region = sdk_config.region().expect("region").as_ref();
let host = format!("sts.{region}.amazonaws.com");
let get_caller_id_url = format!("https://{host}/?Action=GetCallerIdentity&Version=2011-06-15");
let mut signing_settings = SigningSettings::default();
signing_settings.signature_location = SignatureLocation::QueryParams;
signing_settings.expires_in = Some(expires_in);
let signing_params = v4::SigningParams::builder()
.identity(&identity)
.region(region)
.name("sts")
.time(SystemTime::now())
.settings(signing_settings)
.build()?
.into();
let signable_request = SignableRequest::new(
"GET",
&get_caller_id_url,
[("host", host.as_str()), ("x-k8s-aws-id", cluster_name)].into_iter(),
SignableBody::Bytes(&[]),
)?;
let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts();
let mut token_request = http::Request::get(get_caller_id_url).body(()).unwrap();
signing_instructions.apply_to_request_http1x(&mut token_request);
let token = format!(
"k8s-aws-v1.{}",
BASE64_STANDARD_NO_PAD.encode(token_request.uri().to_string())
)
.into();
Ok(token)
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_s3(config: &Config) -> Result<aws_sdk_s3::Client, Error> {
let sdk_config = config.s3_bucket.create_sdk_config().await?;
let s3_client = aws_sdk_s3::Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.retry_config(RetryConfig::standard())
.build(),
);
Ok(s3_client)
}
#[tracing::instrument(skip_all, err)]
async fn get_nodes_azs(client: kube::Client) -> Result<HashMap<String, String>, Error> {
let nodes = Api::<Node>::all(client);
let list_params = ListParams::default().timeout(10);
let mut nodes_azs = HashMap::default();
for node in nodes.list(&list_params).await? {
let Some(name) = node.metadata.name else {
tracing::warn!("pod without name");
continue;
};
let Some(mut labels) = node.metadata.labels else {
tracing::warn!(name, "pod without labels");
continue;
};
let Some(az) = labels.remove(AZ_LABEL) else {
tracing::warn!(name, "pod without AZ label");
continue;
};
tracing::debug!(name, az, "adding node");
nodes_azs.insert(name, az);
}
Ok(nodes_azs)
}
#[tracing::instrument(skip_all, err)]
async fn get_current_pods(
client: kube::Client,
node_az: &HashMap<String, String>,
) -> Result<Vec<PodInfo<'_>>, Error> {
let pods = Api::<Pod>::all(client);
let mut pods_info = vec![];
let mut continuation_token = Some(String::new());
while let Some(token) = continuation_token {
let list_params = ListParams::default()
.timeout(10)
.limit(500)
.continue_token(&token);
let list = pods.list(&list_params).await?;
continuation_token = list.metadata.continue_;
tracing::info!("received list of {} pods", list.items.len());
for pod in list.items {
let name = pod.name_any();
let Some(namespace) = pod.namespace() else {
tracing::warn!(name, "pod without namespace");
continue;
};
let Some(status) = pod.status else {
tracing::warn!(namespace, name, "pod without status");
continue;
};
let Some(conditions) = status.conditions else {
tracing::warn!(namespace, name, "pod without conditions");
continue;
};
let Some(ready_condition) = conditions.iter().find(|cond| cond.type_ == "Ready") else {
tracing::debug!(namespace, name, "pod not ready");
continue;
};
let Some(ref ready_time) = ready_condition.last_transition_time else {
tracing::warn!(
namespace,
name,
"pod ready condition without transition time"
);
continue;
};
let Some(spec) = pod.spec else {
tracing::warn!(namespace, name, "pod without spec");
continue;
};
let Some(node) = spec.node_name else {
tracing::warn!(namespace, name, "pod without node");
continue;
};
let Some(ip) = status.pod_ip else {
tracing::warn!(namespace, name, "pod without IP");
continue;
};
let az = node_az.get(&node).map(String::as_str);
let creation_time = ready_time.0.to_rfc3339_opts(SecondsFormat::Secs, true);
let pod_info = PodInfo {
namespace,
name,
ip,
creation_time,
node,
az,
};
tracing::debug!(?pod_info, "adding pod");
pods_info.push(pod_info);
}
}
Ok(pods_info)
}
#[tracing::instrument(skip_all, err)]
fn write_csv<W: io::Write>(pods_info: &Vec<PodInfo>, writer: W) -> Result<(), Error> {
let mut w = csv::Writer::from_writer(writer);
for pod in pods_info {
w.serialize(pod)?;
}
w.flush()?;
Ok(())
}
#[tracing::instrument(skip_all, err)]
async fn upload_csv(
config: &Config,
s3_client: &aws_sdk_s3::Client,
csv: &[u8],
) -> Result<aws_sdk_s3::operation::put_object::PutObjectOutput, Error> {
let mut hasher = Sha256::new();
hasher.update(csv);
let csum = hasher.finalize();
let resp = s3_client
.put_object()
.bucket(&config.s3_bucket.name)
.key(&config.s3_bucket.key)
.content_type("text/csv")
.checksum_algorithm(ChecksumAlgorithm::Sha256)
.checksum_sha256(STANDARD.encode(csum))
.body(ByteStream::from(SdkBody::from(csv)))
.expected_bucket_owner(&config.aws_account_id)
.send()
.await?;
Ok(resp)
}

View File

@@ -1,3 +0,0 @@
fn main() -> Result<(), lambda_runtime::Error> {
pod_info_dumper::start()
}

View File

@@ -5,6 +5,14 @@ use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// When making requests to the `compute_ctl` external HTTP server, the client
/// must specify a set of claims in `Authorization` header JWTs such that
/// `compute_ctl` can authorize the request.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ComputeClaims {
pub compute_id: String,
}
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
@@ -30,9 +38,3 @@ pub struct SetRoleGrantsRequest {
pub privileges: Vec<Privilege>,
pub role: PgIdent,
}
/// Request of the /configure_telemetry API
#[derive(Debug, Deserialize, Serialize)]
pub struct ConfigureTelemetryRequest {
pub logs_export_host: Option<String>,
}

View File

@@ -168,6 +168,10 @@ pub struct ComputeSpec {
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -179,9 +183,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Allow to configure rsyslog for Postgres logs export
PostgresLogsExport,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -51,9 +51,54 @@ pub struct NodeMetadata {
/// If there cannot be a static default value because we need to make runtime
/// checks to determine the default, make it an `Option` (which defaults to None).
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
///
/// Unknown fields are silently ignored during deserialization.
/// The alternative, which we used in the past, was to set `deny_unknown_fields`,
/// which fails deserialization, and hence pageserver startup, if there is an unknown field.
/// The reason we don't do that anymore is that it complicates
/// usage of config fields for feature flagging, which we commonly do for
/// region-by-region rollouts.
/// The complications mainly arise because the `pageserver.toml` contents on a
/// prod server have a separate lifecycle from the pageserver binary.
/// For instance, `pageserver.toml` contents today are defined in the internal
/// infra repo, and thus introducing a new config field to pageserver and
/// rolling it out to prod servers are separate commits in separate repos
/// that can't be made or rolled back atomically.
/// Rollbacks in particular pose a risk with deny_unknown_fields because
/// the old pageserver binary may reject a new config field, resulting in
/// an outage unless the person doing the pageserver rollback remembers
/// to also revert the commit that added the config field in to the
/// `pageserver.toml` templates in the internal infra repo.
/// (A pre-deploy config check would eliminate this risk during rollbacks,
/// cf [here](https://github.com/neondatabase/cloud/issues/24349).)
/// In addition to this compatibility problem during emergency rollbacks,
/// deny_unknown_fields adds further complications when decomissioning a feature
/// flag: with deny_unknown_fields, we can't remove a flag from the [`ConfigToml`]
/// until all prod servers' `pageserver.toml` files have been updated to a version
/// that doesn't specify the flag. Otherwise new software would fail to start up.
/// This adds the requirement for an intermediate step where the new config field
/// is accepted but ignored, prolonging the decomissioning process by an entire
/// release cycle.
/// By contrast with unknown fields silently ignored, decomissioning a feature
/// flag is a one-step process: we can skip the intermediate step and straight
/// remove the field from the [`ConfigToml`]. We leave the field in the
/// `pageserver.toml` files on prod servers until we reach certainty that we
/// will not roll back to old software whose behavior was dependent on config.
/// Then we can remove the field from the templates in the internal infra repo.
/// This process is [documented internally](
/// https://docs.neon.build/storage/pageserver_configuration.html).
///
/// Note that above relaxed compatbility for the config format does NOT APPLY
/// TO THE STORAGE FORMAT. As general guidance, when introducing storage format
/// changes, ensure that the potential rollback target version will be compatible
/// with the new format. This must hold regardless of what flags are set in in the `pageserver.toml`:
/// any format version that exists in an environment must be compatible with the software that runs there.
/// Use a pageserver.toml flag only to gate whether software _writes_ the new format.
/// For more compatibility considerations, refer to [internal docs](
/// https://docs.neon.build/storage/compat.html?highlight=compat#format-versions--compatibility)
#[serde_as]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default, deny_unknown_fields)]
#[serde(default)]
pub struct ConfigToml {
// types mapped 1:1 into the runtime PageServerConfig type
pub listen_pg_addr: String,
@@ -138,7 +183,6 @@ pub struct ConfigToml {
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
@@ -153,13 +197,11 @@ pub struct DiskUsageEvictionTaskConfig {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum PageServicePipeliningConfig {
Serial,
Pipelined(PageServicePipeliningConfigPipelined),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
@@ -175,7 +217,6 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
/// one after the other and IOs are issued and waited upon
@@ -294,7 +335,7 @@ pub struct MaxVectoredReadBytes(pub NonZeroUsize);
/// Tenant-level configuration values, used for various purposes.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[serde(default)]
pub struct TenantConfigToml {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the

View File

@@ -613,8 +613,7 @@ mod tests {
use rand::{RngCore, SeedableRng};
use super::*;
use crate::models::ShardParameters;
use crate::shard::{ShardCount, ShardNumber};
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
// Helper function to create a key range.
//
@@ -964,12 +963,8 @@ mod tests {
}
#[test]
fn sharded_range_relation_gap() {
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -985,12 +980,8 @@ mod tests {
#[test]
fn shard_identity_keyspaces_single_key() {
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -1034,12 +1025,8 @@ mod tests {
#[test]
fn shard_identity_keyspaces_forkno_gap() {
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -1061,7 +1048,7 @@ mod tests {
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
.unwrap();
@@ -1144,37 +1131,44 @@ mod tests {
/// for a single tenant.
#[test]
fn sharded_range_fragment_simple() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
// A range which we happen to know covers exactly one stripe which belongs to this shard
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
let mut input_end = input_start;
input_end.field6 += STRIPE_SIZE; // field6 is block number
// Ask for stripe_size blocks, we get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 32768),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for more, we still get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 10000000),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for target_nblocks of half the stripe size, we get two halves
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16384),
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
(
32768,
STRIPE_SIZE,
vec![
(16384, input_start..input_start.add(16384)),
(16384, input_start.add(16384)..input_end)
(
STRIPE_SIZE / 2,
input_start..input_start.add(STRIPE_SIZE / 2)
),
(STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
]
)
);
@@ -1182,40 +1176,53 @@ mod tests {
#[test]
fn sharded_range_fragment_multi_stripe() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
// A range which covers multiple stripes, exactly one of which belongs to the current shard.
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
let mut input_end = input_start;
input_end.field6 += RANGE_SIZE; // field6 is block number
// Ask for all the blocks, get a fragment that covers the whole range but reports
// its size to be just the blocks belonging to our shard.
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 131072),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for a sub-stripe quantity
// Ask for a sub-stripe quantity that results in 3 fragments.
let limit = STRIPE_SIZE / 3 + 1;
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16000),
do_fragment(input_start, input_end, &shard_identity, limit),
(
32768,
STRIPE_SIZE,
vec![
(16000, input_start..input_start.add(16000)),
(16000, input_start.add(16000)..input_start.add(32000)),
(768, input_start.add(32000)..input_end),
(limit, input_start..input_start.add(limit)),
(limit, input_start.add(limit)..input_start.add(2 * limit)),
(
STRIPE_SIZE - 2 * limit,
input_start.add(2 * limit)..input_end
),
]
)
);
// Try on a range that starts slightly after our owned stripe
assert_eq!(
do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
(32767, vec![(32767, input_start.add(1)..input_end)])
do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
(
STRIPE_SIZE - 1,
vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
)
);
}
@@ -1223,32 +1230,40 @@ mod tests {
/// a previous relation.
#[test]
fn sharded_range_fragment_starting_from_logical_size() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
input_end.field6 += RANGE_SIZE; // field6 is block number
// Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x8001, vec![(0x8001, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(
STRIPE_SIZE + 1,
vec![(STRIPE_SIZE + 1, input_start..input_end)]
)
);
// Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
// store all logical sizes)
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x1, vec![(0x1, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(1, vec![(1, input_start..input_end)])
);
}
@@ -1284,12 +1299,8 @@ mod tests {
);
// Same, but using a sharded identity
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
@@ -1331,7 +1342,7 @@ mod tests {
ShardIdentity::new(
ShardNumber((prng.next_u32() % shard_count) as u8),
ShardCount::new(shard_count as u8),
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
.unwrap()
};

View File

@@ -26,7 +26,7 @@ use utils::{completion, serde_system_time};
use crate::config::Ratio;
use crate::key::{CompactKey, Key};
use crate::reltag::RelTag;
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
/// The state of a tenant in this pageserver.
///
@@ -80,10 +80,22 @@ pub enum TenantState {
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping {
/// The barrier can be used to wait for shutdown to complete. The first caller to set
/// Some(Barrier) is responsible for driving shutdown to completion. Subsequent callers
/// will wait for the first caller's existing barrier.
///
/// None is set when an attach is cancelled, to signal to shutdown that the attach has in
/// fact cancelled:
///
/// 1. `shutdown` sees `TenantState::Attaching`, and cancels the tenant.
/// 2. `attach` sets `TenantState::Stopping(None)` and exits.
/// 3. `set_stopping` waits for `TenantState::Stopping(None)` and sets
/// `TenantState::Stopping(Some)` to claim the barrier as the shutdown owner.
//
// Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
// otherwise it will not be skipped during deserialization
#[serde(skip)]
progress: completion::Barrier,
progress: Option<completion::Barrier>,
},
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
@@ -426,8 +438,6 @@ pub struct ShardParameters {
}
impl ShardParameters {
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
pub fn is_unsharded(&self) -> bool {
self.count.is_unsharded()
}
@@ -437,7 +447,7 @@ impl Default for ShardParameters {
fn default() -> Self {
Self {
count: ShardCount::new(0),
stripe_size: Self::DEFAULT_STRIPE_SIZE,
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
}
@@ -1104,7 +1114,7 @@ pub struct CompactionAlgorithmSettings {
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
@@ -1668,6 +1678,7 @@ pub struct SecondaryProgress {
pub struct TenantScanRemoteStorageShard {
pub tenant_shard_id: TenantShardId,
pub generation: Option<u32>,
pub stripe_size: Option<ShardStripeSize>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -2719,10 +2730,15 @@ mod tests {
"Activating",
),
(line!(), TenantState::Active, "Active"),
(
line!(),
TenantState::Stopping { progress: None },
"Stopping",
),
(
line!(),
TenantState::Stopping {
progress: utils::completion::Barrier::default(),
progress: Some(completion::Barrier::default()),
},
"Stopping",
),

View File

@@ -58,6 +58,8 @@ pub enum NeonWalRecord {
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
/// Only append the record if the current image is the same as the one specified in this field.
only_if: Option<String>,
},
}
@@ -81,6 +83,17 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: None,
}
}
#[cfg(feature = "testing")]
pub fn wal_append_conditional(s: impl AsRef<str>, only_if: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: Some(only_if.as_ref().to_string()),
}
}
@@ -90,6 +103,7 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: false,
only_if: None,
}
}
@@ -99,6 +113,7 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: true,
only_if: None,
}
}
}

View File

@@ -78,6 +78,12 @@ impl Default for ShardStripeSize {
}
}
impl std::fmt::Display for ShardStripeSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);
@@ -86,8 +92,11 @@ const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// ShardIdentity uses a magic layout value to indicate if it is unusable
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
/// The default stripe size in pages. 16 MiB divided by 8 kiB page size.
///
/// A lower stripe size distributes ingest load better across shards, but reduces IO amortization.
/// 16 MiB appears to be a reasonable balance: <https://github.com/neondatabase/neon/pull/10510>.
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(16 * 1024 / 8);
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ShardConfigError {
@@ -537,7 +546,7 @@ mod tests {
field6: 0x7d06,
};
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
let shard = key_to_shard_number(ShardCount(10), ShardStripeSize(32768), &key);
assert_eq!(shard, ShardNumber(8));
}

View File

@@ -5,7 +5,6 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
@@ -227,7 +226,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
match self {
MaybeWriteOnly::Full(framed) => framed.read_startup_message().await,
MaybeWriteOnly::WriteOnly(_) => {
Err(io::Error::new(ErrorKind::Other, "reading from write only half").into())
Err(io::Error::other("reading from write only half").into())
}
MaybeWriteOnly::Broken => panic!("IO on invalid MaybeWriteOnly"),
}
@@ -237,7 +236,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
match self {
MaybeWriteOnly::Full(framed) => framed.read_message().await,
MaybeWriteOnly::WriteOnly(_) => {
Err(io::Error::new(ErrorKind::Other, "reading from write only half").into())
Err(io::Error::other("reading from write only half").into())
}
MaybeWriteOnly::Broken => panic!("IO on invalid MaybeWriteOnly"),
}
@@ -975,7 +974,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'_, IO> {
.write_message_noflush(&BeMessage::CopyData(buf))
// write_message only writes to the buffer, so it can fail iff the
// message is invaid, but CopyData can't be invalid.
.map_err(|_| io::Error::new(ErrorKind::Other, "failed to serialize CopyData"))?;
.map_err(|_| io::Error::other("failed to serialize CopyData"))?;
Poll::Ready(Ok(buf.len()))
}

View File

@@ -85,8 +85,8 @@ static KEY: Lazy<rustls::pki_types::PrivateKeyDer<'static>> = Lazy::new(|| {
static CERT: Lazy<rustls::pki_types::CertificateDer<'static>> = Lazy::new(|| {
let mut cursor = Cursor::new(include_bytes!("cert.pem"));
let cert = rustls_pemfile::certs(&mut cursor).next().unwrap().unwrap();
cert
rustls_pemfile::certs(&mut cursor).next().unwrap().unwrap()
});
// test that basic select with ssl works

View File

@@ -35,7 +35,7 @@ impl ConnectionError {
pub fn into_io_error(self) -> io::Error {
match self {
ConnectionError::Io(io) => io,
ConnectionError::Protocol(pe) => io::Error::new(io::ErrorKind::Other, pe.to_string()),
ConnectionError::Protocol(pe) => io::Error::other(pe.to_string()),
}
}
}

View File

@@ -257,7 +257,7 @@ pub enum ProtocolError {
impl ProtocolError {
/// Proxy stream.rs uses only io::Error; provide it.
pub fn into_io_error(self) -> io::Error {
io::Error::new(io::ErrorKind::Other, self.to_string())
io::Error::other(self.to_string())
}
}

View File

@@ -212,7 +212,7 @@ impl ScramSha256 {
password,
channel_binding,
} => (nonce, password, channel_binding),
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid SCRAM state")),
_ => return Err(io::Error::other("invalid SCRAM state")),
};
let message =
@@ -291,7 +291,7 @@ impl ScramSha256 {
server_key,
auth_message,
} => (server_key, auth_message),
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid SCRAM state")),
_ => return Err(io::Error::other("invalid SCRAM state")),
};
let message =
@@ -301,10 +301,7 @@ impl ScramSha256 {
let verifier = match parsed {
ServerFinalMessage::Error(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("SCRAM error: {}", e),
));
return Err(io::Error::other(format!("SCRAM error: {}", e)));
}
ServerFinalMessage::Verifier(verifier) => verifier,
};

View File

@@ -28,7 +28,7 @@ toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
metrics.workspace = true
utils.workspace = true
utils = { path = "../utils", default-features = false }
pin-project-lite.workspace = true
azure_core.workspace = true

View File

@@ -801,8 +801,7 @@ where
// that support needs to be hacked in.
//
// including {self:?} into the message would be useful, but unsure how to unproject.
_ => std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
_ => std::task::Poll::Ready(Err(std::io::Error::other(
"cloned or initial values cannot be read",
))),
}
@@ -855,7 +854,7 @@ where
};
Err(azure_core::error::Error::new(
azure_core::error::ErrorKind::Io,
std::io::Error::new(std::io::ErrorKind::Other, msg),
std::io::Error::other(msg),
))
}

View File

@@ -5,7 +5,8 @@ edition.workspace = true
license.workspace = true
[features]
default = []
default = ["rename_noreplace"]
rename_noreplace = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
@@ -35,7 +36,7 @@ serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio = { workspace = true, features = ["signal"] }
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = ["serde"] }

View File

@@ -173,7 +173,7 @@ impl std::fmt::Debug for JwtAuth {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String> {
pub fn encode_from_key_file<S: Serialize>(claims: &S, key_data: &[u8]) -> Result<String> {
let key = EncodingKey::from_ed_pem(key_data)?;
Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?)
}

View File

@@ -81,12 +81,9 @@ pub fn path_with_suffix_extension(
}
pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
let parent = file_path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("File {file_path:?} has no parent"),
)
})?;
let parent = file_path
.parent()
.ok_or_else(|| io::Error::other(format!("File {file_path:?} has no parent")))?;
fsync(file_path)?;
fsync(parent)?;

View File

@@ -0,0 +1,26 @@
use std::time::{Duration, Instant};
#[derive(Default)]
pub struct ElapsedAccum {
accum: Duration,
}
impl ElapsedAccum {
pub fn get(&self) -> Duration {
self.accum
}
pub fn guard(&mut self) -> impl Drop + '_ {
let start = Instant::now();
scopeguard::guard(start, |last_wait_at| {
self.accum += Instant::now() - last_wait_at;
})
}
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
where
Fut: Future<Output = O>,
{
let _guard = self.guard();
fut.await
}
}

View File

@@ -3,7 +3,9 @@ use std::{fs, io, path::Path};
use anyhow::Context;
#[cfg(feature = "rename_noreplace")]
mod rename_noreplace;
#[cfg(feature = "rename_noreplace")]
pub use rename_noreplace::rename_noreplace;
pub trait PathExt {

View File

@@ -8,7 +8,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
dst: &P2,
) -> nix::Result<()> {
{
#[cfg(target_os = "linux")]
#[cfg(all(target_os = "linux", target_env = "gnu"))]
{
nix::fcntl::renameat2(
None,
@@ -29,7 +29,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
})??;
nix::errno::Errno::result(res).map(drop)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
#[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))]
{
std::compile_error!("OS does not support no-replace renames");
}

View File

@@ -93,6 +93,8 @@ pub mod try_rcu;
pub mod guard_arc_swap;
pub mod elapsed_accum;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -1,6 +1,8 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
use tokio::signal::unix::{SignalKind, signal};
use tracing::info;
pub enum Signal {
Quit,
@@ -36,3 +38,30 @@ impl ShutdownSignals {
Ok(())
}
}
/// Runs in a loop since we want to be responsive to multiple signals
/// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown)
/// <https://github.com/neondatabase/neon/issues/9740>
pub async fn signal_handler(token: tokio_util::sync::CancellationToken) {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
}

View File

@@ -111,9 +111,17 @@ impl<T> OnceCell<T> {
}
}
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
self.get_or_init_detached_measured(None).await
}
/// Returns a guard to an existing initialized value, or returns an unique initialization
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
pub async fn get_or_init_detached_measured(
&self,
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
) -> Result<Guard<'_, T>, InitPermit> {
// It looks like OnceCell::get_or_init could be implemented using this method instead of
// duplication. However, that makes the future be !Send due to possibly holding on to the
// MutexGuard over an await point.
@@ -125,12 +133,16 @@ impl<T> OnceCell<T> {
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
let fut = sem.acquire();
if let Some(wait_time) = wait_time.as_mut() {
wait_time.measure(fut).await
} else {
fut.await
}
};
let Ok(permit) = permit else {

28
object_storage/Cargo.toml Normal file
View File

@@ -0,0 +1,28 @@
[package]
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
axum-extra.workspace = true
axum.workspace = true
camino.workspace = true
futures.workspace = true
jsonwebtoken.workspace = true
prometheus.workspace = true
remote_storage.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
utils = { path = "../libs/utils", default-features = false }
workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
http-body-util.workspace = true
itertools.workspace = true
rand.workspace = true
test-log.workspace = true
tower.workspace = true

561
object_storage/src/app.rs Normal file
View File

@@ -0,0 +1,561 @@
use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use utils::backoff::retry;
pub fn app(state: Arc<Storage>) -> Router<()> {
use axum::routing::{delete as _delete, get as _get};
let delete_prefix = _delete(delete_prefix);
Router::new()
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
_get(get).put(set).delete(delete),
)
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}",
delete_prefix.clone(),
)
.route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
.route("/{tenant_id}", delete_prefix)
.route("/metrics", _get(metrics))
.route("/status", _get(async || StatusCode::OK.into_response()))
.with_state(state)
}
type Result = anyhow::Result<Response, Response>;
type State = axum::extract::State<Arc<Storage>>;
const CONTENT_TYPE: &str = "content-type";
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const WARN_THRESHOLD: u32 = 3;
const MAX_RETRIES: u32 = 10;
async fn metrics() -> Result {
prometheus::TextEncoder::new()
.encode_to_string(&prometheus::gather())
.map(|s| s.into_response())
.map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
}
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
let stream = retry(
async || state.storage.download(&path, opts, &cancel).await,
DownloadError::is_permanent,
WARN_THRESHOLD,
MAX_RETRIES,
"downloading",
&cancel,
)
.await
.unwrap_or(Err(DownloadError::Cancelled))
.map_err(download_err)?
.download_stream;
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from_stream(stream))
.map_err(|e| internal_error(e, path, "reading response"))
}
// Best solution for files is multipart upload, but remote_storage doesn't support it,
// so we can either read Bytes in memory and push at once or forward BodyDataStream to
// remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
// guaranteed size() which may produce issues while uploading to s3.
// So, currently we're going with an in-memory copy plus a boundary to prevent uploading
// very large files.
async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
info!(%path, "uploading");
let request_len = bytes.len();
let max_len = state.max_upload_file_limit;
if request_len > max_len {
return Err(bad_request(
anyhow!("File size {request_len} exceeds max {max_len}"),
"uploading",
));
}
let cancel = state.cancel.clone();
let fun = async || {
let stream = bytes_to_stream(bytes.clone());
state
.storage
.upload(stream, request_len, &path, None, &cancel)
.await
};
retry(
fun,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"uploading",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("uploading cancelled")))
.map_err(|e| internal_error(e, path, "reading response"))?;
Ok(ok())
}
async fn delete(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "deleting");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting cancelled")))
.map_err(|e| internal_error(e, path, "deleting"))?;
Ok(ok())
}
async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
info!(%path, "deleting prefix");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete_prefix(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting prefix",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting prefix cancelled")))
.map_err(|e| internal_error(e, path, "deleting prefix"))?;
Ok(ok())
}
pub async fn check_storage_permissions(
client: &GenericRemoteStorage,
cancel: CancellationToken,
) -> anyhow::Result<()> {
info!("storage permissions check");
// as_nanos() as multiple instances proxying same bucket may be started at once
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_nanos()
.to_string();
let path = RemotePath::from_string(&format!("write_access_{now}"))?;
info!(%path, "uploading");
let body = now.to_string();
let stream = bytes_to_stream(Bytes::from(body.clone()));
client
.upload(stream, body.len(), &path, None, &cancel)
.await?;
use tokio::io::AsyncReadExt;
info!(%path, "downloading");
let download_opts = DownloadOpts {
kind: remote_storage::DownloadKind::Small,
..Default::default()
};
let mut body_read_buf = Vec::new();
let stream = client
.download(&path, &download_opts, &cancel)
.await?
.download_stream;
tokio_util::io::StreamReader::new(stream)
.read_to_end(&mut body_read_buf)
.await?;
let body_read = String::from_utf8(body_read_buf)?;
if body != body_read {
error!(%body, %body_read, "File contents do not match");
anyhow::bail!("Read back file doesn't match original")
}
info!(%path, "removing");
client.delete(&path, &cancel).await
}
fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
futures::stream::once(futures::future::ready(Ok(bytes)))
}
#[cfg(test)]
mod tests {
use super::*;
use axum::{body::Body, extract::Request, response::Response};
use http_body_util::BodyExt;
use itertools::iproduct;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use test_log::test as testlog;
use tower::{Service, util::ServiceExt};
use utils::id::{TenantId, TimelineId};
// see libs/remote_storage/tests/test_real_s3.rs
const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
let cancel = CancellationToken::new();
let (dir, storage) = if var(REAL_S3_ENV).is_err() {
// tests execute in parallel and we need a new directory for each of them
let dir = camino_tempfile::tempdir().unwrap();
let fs =
remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
(Some(dir), GenericRemoteStorage::LocalFs(fs))
} else {
// test_real_s3::create_s3_client is hard to reference, reimplementing here
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
use rand::Rng;
let random = rand::thread_rng().r#gen::<u32>();
let s3_config = remote_storage::S3Config {
bucket_name: var(REAL_S3_BUCKET).unwrap(),
bucket_region: var(REAL_S3_REGION).unwrap(),
prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
endpoint: None,
concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: None,
upload_storage_class: None,
};
let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
.await
.unwrap();
(None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
};
let proxy = Storage {
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
};
check_storage_permissions(&proxy.storage, cancel)
.await
.unwrap();
(proxy, dir)
}
// see libs/utils/src/auth.rs
const TEST_PUB_KEY_ED25519: &[u8] = b"
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
-----END PUBLIC KEY-----
";
const TEST_PRIV_KEY_ED25519: &[u8] = br#"
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
-----END PRIVATE KEY-----
"#;
async fn request(req: Request<Body>) -> Response<Body> {
let (proxy, _) = proxy().await;
app(Arc::new(proxy))
.into_service()
.oneshot(req)
.await
.unwrap()
}
#[testlog(tokio::test)]
async fn status() {
let res = Request::builder()
.uri("/status")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
}
fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
iproduct!(
vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
vec!["GET", "PUT", "DELETE"]
)
}
#[testlog(tokio::test)]
async fn no_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let res = Request::builder()
.uri(uri)
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
res.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
#[testlog(tokio::test)]
async fn invalid_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let status = Request::builder()
.uri(uri)
.header("Authorization", "Bearer 123")
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
status.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
const TENANT_ID: TenantId =
TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
#[testlog(tokio::test)]
async fn unauthorized() {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
let token = token();
let args = itertools::iproduct!(
vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap()
.status();
assert_eq!(status, StatusCode::UNAUTHORIZED);
}
}
#[testlog(tokio::test)]
async fn method_not_allowed() {
let token = token();
let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
for (key, method) in iter {
let status = Request::builder()
.uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
.method(method)
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.map(request)
.unwrap()
.await
.status();
assert!(matches!(
status,
StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
));
}
}
async fn requests_chain(
chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
token: impl Fn(&str) -> String,
) {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
for (uri, method, body, expected_status, compare_body) in chain {
info!(%uri, %method, %body, %expected_status);
let bearer = format!("Bearer {}", token(&uri));
let request = Request::builder()
.uri(uri)
.method(method)
.header("Authorization", &bearer)
.body(Body::from(body))
.unwrap();
let response = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap();
assert_eq!(response.status(), expected_status);
if !compare_body {
continue;
}
let read_body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(body, read_body);
}
}
#[testlog(tokio::test)]
async fn metrics() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let req = vec![
(uri.clone(), "PUT", "body", StatusCode::OK, false),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
];
requests_chain(req.into_iter(), |_| token()).await;
let res = Request::builder()
.uri("/metrics")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&body);
tracing::debug!(%body);
// Storage metrics are not gathered for LocalFs
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
assert!(body.contains("process_threads"));
}
#[testlog(tokio::test)]
async fn insert_retrieve_remove() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
(uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
(uri, "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
// Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
#[testlog(tokio::test)]
async fn delete_prefix() {
let tenant_id =
TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
// Why extra slash in string literals? Axum is weird with URIs:
// /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
// as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
// The cost of removing trailing slash is suprisingly hard:
// * Add tower dependency with NormalizePath layer
// * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
// * Rewrite make_service() -> into_make_service()
// * Rewrite oneshot() (not available for NormalizePath)
// I didn't manage to get it working correctly
let chain = vec![
// create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
(f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
// create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
// create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
(f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
(f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
// delete prefix 1 -> empty
(format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), delete_prefix_token).await;
}
}

344
object_storage/src/lib.rs Normal file
View File

@@ -0,0 +1,344 @@
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
use axum::{RequestPartsExt, http::StatusCode, http::request::Parts};
use axum_extra::TypedHeader;
use axum_extra::headers::{Authorization, authorization::Bearer};
use camino::Utf8PathBuf;
use jsonwebtoken::{DecodingKey, Validation};
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
decoding_key: DecodingKey,
validation: Validation,
}
pub const VALIDATION_ALGO: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::EdDSA;
impl JwtAuth {
pub fn new(key: &[u8]) -> Result<Self> {
Ok(Self {
decoding_key: DecodingKey::from_ed_pem(key)?,
validation: Validation::new(VALIDATION_ALGO),
})
}
pub fn decode<T: serde::de::DeserializeOwned>(&self, token: &str) -> Result<T> {
Ok(jsonwebtoken::decode(token, &self.decoding_key, &self.validation).map(|t| t.claims)?)
}
}
fn normalize_key(key: &str) -> StdResult<Utf8PathBuf, String> {
let key = clean_utf8(&Utf8PathBuf::from(key));
if key.starts_with("..") || key == "." || key == "/" {
return Err(format!("invalid key {key}"));
}
match key.strip_prefix("/").map(Utf8PathBuf::from) {
Ok(p) => Ok(p),
_ => Ok(key),
}
}
// Copied from path_clean crate with PathBuf->Utf8PathBuf
fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf {
use camino::Utf8Component as Comp;
let mut out = Vec::new();
for comp in path.components() {
match comp {
Comp::CurDir => (),
Comp::ParentDir => match out.last() {
Some(Comp::RootDir) => (),
Some(Comp::Normal(_)) => {
out.pop();
}
None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => {
out.push(comp)
}
},
comp => out.push(comp),
}
}
if !out.is_empty() {
out.iter().collect()
} else {
Utf8PathBuf::from(".")
}
}
pub struct Storage {
pub auth: JwtAuth,
pub storage: GenericRemoteStorage,
pub cancel: CancellationToken,
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint_id: EndpointId,
path: String,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
}
impl TryFrom<&KeyRequest> for S3Path {
type Error = String;
fn try_from(req: &KeyRequest) -> StdResult<Self, Self::Error> {
let KeyRequest {
tenant_id,
timeline_id,
endpoint_id,
path,
} = &req;
let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",);
let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
Ok(S3Path { path })
}
}
fn unauthorized(route: impl Display, claims: impl Display) -> Response {
debug!(%route, %claims, "route doesn't match claims");
StatusCode::UNAUTHORIZED.into_response()
}
pub fn bad_request(err: impl Display, desc: &'static str) -> Response {
debug!(%err, desc);
(StatusCode::BAD_REQUEST, err.to_string()).into_response()
}
pub fn ok() -> Response {
StatusCode::OK.into_response()
}
pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response {
error!(%err, %path, desc);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
pub fn not_found(key: impl ToString) -> Response {
(StatusCode::NOT_FOUND, key.to_string()).into_response()
}
impl FromRequestParts<Arc<Storage>> for S3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path): Path<KeyRequest> = parts
.extract()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: Claims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
}
(&path)
.try_into()
.map_err(|e| bad_request(e, "invalid route"))
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let endpoint_id = path
.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let path = Utf8PathBuf::from(path.tenant_id.to_string())
.join(timeline_id)
.join(endpoint_id);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
PrefixS3Path { path }
}
}
impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyPath>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: PrefixKeyPath = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
if path != claims {
return Err(unauthorized(path, claims));
}
Ok((&path).into())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_key() {
let f = super::normalize_key;
assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello"));
assert_eq!(
f("ololo/1/../../not_ololo").unwrap(),
Utf8PathBuf::from("not_ololo")
);
assert!(f("ololo/1/../../../").is_err());
assert!(f(".").is_err());
assert!(f("../").is_err());
assert!(f("").is_err());
assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3"));
assert!(f("/1/2/3/../../../").is_err());
assert!(f("/1/2/3/../../../../").is_err());
}
const TENANT_ID: TenantId =
TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
#[test]
fn s3_path() {
let auth = Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let s3_path = |key| {
let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}");
let path = RemotePath::from_string(path).unwrap();
S3Path { path }
};
let path = "cache_key".to_string();
let mut key_path = KeyRequest {
path,
tenant_id: auth.tenant_id,
timeline_id: auth.timeline_id,
endpoint_id: auth.endpoint_id,
};
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "we/can/have/nested/paths".to_string();
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "../error/hello/../".to_string();
assert!(S3Path::try_from(&key_path).is_err());
}
#[test]
fn prefix_s3_path() {
let mut path = PrefixKeyPath {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}"))
);
path.timeline_id = Some(TIMELINE_ID);
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}"))
);
path.endpoint_id = Some(ENDPOINT_ID.into());
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}"))
);
}
}

View File

@@ -0,0 +1,65 @@
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
mod app;
use anyhow::Context;
use tracing::info;
use utils::logging;
//see set()
const fn max_upload_file_limit() -> usize {
100 * 1024 * 1024
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
storage_config: remote_storage::RemoteStorageConfig,
#[serde(default = "max_upload_file_limit")]
max_upload_file_limit: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)?;
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
let config: Config = serde_json::from_str(&config).context("parsing config")?;
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),
max_upload_file_limit: config.max_upload_file_limit,
});
tokio::spawn(utils::signals::signal_handler(cancel.clone()));
axum::serve(listener, app::app(proxy))
.with_graceful_shutdown(async move { cancel.cancelled().await })
.await?;
Ok(())
}

View File

@@ -65,7 +65,7 @@ use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use pageserver::config::PageServerConf;
use pageserver::walredo::PostgresRedoManager;
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
@@ -223,7 +223,14 @@ impl Request {
// TODO: avoid these clones
manager
.request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version)
.request_redo(
*key,
*lsn,
base_img.clone(),
records.clone(),
*pg_version,
RedoAttemptType::ReadPage,
)
.await
.context("request_redo")
}

View File

@@ -9,14 +9,14 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, anyhow};
use anyhow::{Context, anyhow, bail};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
use pageserver::config::{PageServerConf, PageserverIdentity};
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
@@ -31,7 +31,6 @@ use pageserver::{
};
use postgres_backend::AuthType;
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -80,6 +79,8 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let dev_mode = arg_matches.get_flag("dev");
// Initialize up failpoints support
let scenario = failpoint_support::init();
@@ -98,7 +99,21 @@ fn main() -> anyhow::Result<()> {
env::set_current_dir(&workdir)
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
let conf = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
if !dev_mode {
if matches!(conf.http_auth_type, AuthType::Trust)
|| matches!(conf.pg_auth_type, AuthType::Trust)
{
bail!(
"Pageserver refuses to start with HTTP or PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Initialize logging.
//
@@ -144,7 +159,17 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// after setting up logging, log the effective IO engine choice and read path implementations
// Warn about ignored config items; see pageserver_api::config::ConfigToml
// doc comment for rationale why we prefer this over serde(deny_unknown_fields).
{
let ignored_fields::Paths { paths } = &ignored;
for path in paths {
warn!(?path, "ignoring unknown configuration item");
}
}
// Log configuration items for feature-flag-like config
// (maybe we should automate this with a visitor?).
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
@@ -207,7 +232,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -217,7 +242,7 @@ fn initialize_config(
identity_file_path: &Utf8Path,
cfg_file_path: &Utf8Path,
workdir: &Utf8Path,
) -> anyhow::Result<&'static PageServerConf> {
) -> anyhow::Result<(&'static PageServerConf, ignored_fields::Paths)> {
// The deployment orchestrator writes out an indentity file containing the node id
// for all pageservers. This file is the source of truth for the node id. In order
// to allow for rolling back pageserver releases, the node id is also included in
@@ -246,16 +271,36 @@ fn initialize_config(
let config_file_contents =
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
let config_toml = serde_path_to_error::deserialize(
toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
// Deserialize the config file contents into a ConfigToml.
let config_toml: pageserver_api::config::ConfigToml = {
let deserializer = toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?;
let mut path_to_error_track = serde_path_to_error::Track::new();
let deserializer =
serde_path_to_error::Deserializer::new(deserializer, &mut path_to_error_track);
serde::Deserialize::deserialize(deserializer).context("deserialize config toml")?
};
// Find unknown fields by re-serializing the parsed ConfigToml and comparing it to the on-disk file.
// Any fields that are only in the on-disk version are unknown.
// (The assumption here is that the ConfigToml doesn't to skip_serializing_if.)
// (Make sure to read the ConfigToml doc comment on why we only want to warn about, but not fail startup, on unknown fields).
let ignored = {
let ondisk_toml = config_file_contents
.parse::<toml_edit::DocumentMut>()
.context("parse original config as toml document")?;
let parsed_toml = toml_edit::ser::to_document(&config_toml)
.context("re-serialize config to toml document")?;
pageserver::config::ignored_fields::find(ondisk_toml, parsed_toml)
};
// Construct the runtime god object (it's called PageServerConf but actually is just global shared state).
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
let conf = Box::leak(Box::new(conf));
Ok(Box::leak(Box::new(conf)))
Ok((conf, ignored))
}
struct WaitForPhaseResult<F: std::future::Future + Unpin> {
@@ -306,6 +351,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
ignored: ignored_fields::Paths,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
@@ -329,7 +375,7 @@ fn start_pageserver(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics(conf);
pageserver::preinitialize_metrics(conf, ignored);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
@@ -713,32 +759,7 @@ fn start_pageserver(
let signal_token = CancellationToken::new();
let signal_cancel = signal_token.child_token();
// Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals
// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See:
// https://github.com/neondatabase/neon/issues/9740.
tokio::spawn(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !signal_token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
signal_token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
});
tokio::spawn(utils::signals::signal_handler(signal_token));
// Wait for cancellation signal and shut down the pageserver.
//
@@ -811,6 +832,12 @@ fn cli() -> Command {
.action(ArgAction::SetTrue)
.help("Show enabled compile time features"),
)
.arg(
Arg::new("dev")
.long("dev")
.action(ArgAction::SetTrue)
.help("Run in development mode (disables security checks)"),
)
}
#[test]

View File

@@ -4,6 +4,8 @@
//! file, or on the command line.
//! See also `settings.md` for better description on every parameter.
pub mod ignored_fields;
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -560,7 +562,6 @@ impl PageServerConf {
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(deny_unknown_fields)]
pub struct PageserverIdentity {
pub id: NodeId,
}
@@ -632,82 +633,4 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.
///
/// The default in serde is to allow unknown fields, so, we rely
/// on developer+review discipline to add `deny_unknown_fields` when adding
/// new structs to the config, and these tests here as a regression test.
///
/// The alternative to all of this would be to allow unknown fields in the config.
/// To catch them, we could have a config check tool or mgmt API endpoint that
/// compares the effective config with the TOML on disk and makes sure that
/// the on-disk TOML is a strict subset of the effective config.
mod unknown_fields_handling {
macro_rules! test {
($short_name:ident, $input:expr) => {
#[test]
fn $short_name() {
let input = $input;
let err = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(&input)
.expect_err("some_invalid_field is an invalid field");
dbg!(&err);
assert!(err.to_string().contains("some_invalid_field"));
}
};
}
use indoc::indoc;
test!(
toplevel,
indoc! {r#"
some_invalid_field = 23
"#}
);
test!(
toplevel_nested,
indoc! {r#"
[some_invalid_field]
foo = 23
"#}
);
test!(
disk_usage_based_eviction,
indoc! {r#"
[disk_usage_based_eviction]
some_invalid_field = 23
"#}
);
test!(
tenant_config,
indoc! {r#"
[tenant_config]
some_invalid_field = 23
"#}
);
test!(
l0_flush,
indoc! {r#"
[l0_flush]
mode = "direct"
some_invalid_field = 23
"#}
);
// TODO: fix this => https://github.com/neondatabase/neon/issues/8915
// test!(
// remote_storage_config,
// indoc! {r#"
// [remote_storage_config]
// local_path = "/nonexistent"
// some_invalid_field = 23
// "#}
// );
}
}

View File

@@ -0,0 +1,179 @@
//! Check for fields in the on-disk config file that were ignored when
//! deserializing [`pageserver_api::config::ConfigToml`].
//!
//! This could have been part of the [`pageserver_api::config`] module,
//! but the way we identify unused fields in this module
//! is specific to the format (TOML) and the implementation of the
//! deserialization for that format ([`toml_edit`]).
use std::collections::HashSet;
use itertools::Itertools;
/// Pass in the user-specified config and the re-serialized [`pageserver_api::config::ConfigToml`].
/// The returned [`Paths`] contains the paths to the fields that were ignored by deserialization
/// of the [`pageserver_api::config::ConfigToml`].
pub fn find(user_specified: toml_edit::DocumentMut, reserialized: toml_edit::DocumentMut) -> Paths {
let user_specified = paths(user_specified);
let reserialized = paths(reserialized);
fn paths(doc: toml_edit::DocumentMut) -> HashSet<String> {
let mut out = Vec::new();
let mut visitor = PathsVisitor::new(&mut out);
visitor.visit_table_like(doc.as_table());
HashSet::from_iter(out)
}
let mut ignored = HashSet::new();
// O(n) because of HashSet
for path in user_specified {
if !reserialized.contains(&path) {
ignored.insert(path);
}
}
Paths {
paths: ignored
.into_iter()
// sort lexicographically for deterministic output
.sorted()
.collect(),
}
}
pub struct Paths {
pub paths: Vec<String>,
}
struct PathsVisitor<'a> {
stack: Vec<String>,
out: &'a mut Vec<String>,
}
impl<'a> PathsVisitor<'a> {
fn new(out: &'a mut Vec<String>) -> Self {
Self {
stack: Vec::new(),
out,
}
}
fn visit_table_like(&mut self, table_like: &dyn toml_edit::TableLike) {
for (entry, item) in table_like.iter() {
self.stack.push(entry.to_string());
self.visit_item(item);
self.stack.pop();
}
}
fn visit_item(&mut self, item: &toml_edit::Item) {
match item {
toml_edit::Item::None => (),
toml_edit::Item::Value(value) => self.visit_value(value),
toml_edit::Item::Table(table) => {
self.visit_table_like(table);
}
toml_edit::Item::ArrayOfTables(array_of_tables) => {
for (i, table) in array_of_tables.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_table_like(table);
self.stack.pop();
}
}
}
}
fn visit_value(&mut self, value: &toml_edit::Value) {
match value {
toml_edit::Value::String(_)
| toml_edit::Value::Integer(_)
| toml_edit::Value::Float(_)
| toml_edit::Value::Boolean(_)
| toml_edit::Value::Datetime(_) => self.out.push(self.stack.join(".")),
toml_edit::Value::Array(array) => {
for (i, value) in array.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_value(value);
self.stack.pop();
}
}
toml_edit::Value::InlineTable(inline_table) => self.visit_table_like(inline_table),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
fn test_impl(original: &str, parsed: &str, expect: [&str; 1]) {
let original: toml_edit::DocumentMut = original.parse().expect("parse original config");
let parsed: toml_edit::DocumentMut = parsed.parse().expect("parse re-serialized config");
let super::Paths { paths: actual } = super::find(original, parsed);
assert_eq!(actual, &expect);
}
#[test]
fn top_level() {
test_impl(
r#"
[a]
b = 1
c = 2
d = 3
"#,
r#"
[a]
b = 1
c = 2
"#,
["a.d"],
);
}
#[test]
fn nested() {
test_impl(
r#"
[a.b.c]
d = 23
"#,
r#"
[a]
e = 42
"#,
["a.b.c.d"],
);
}
#[test]
fn array_of_tables() {
test_impl(
r#"
[[a]]
b = 1
c = 2
d = 3
"#,
r#"
[[a]]
b = 1
c = 2
"#,
["a.[0].d"],
);
}
#[test]
fn array() {
test_impl(
r#"
foo = [ {bar = 23} ]
"#,
r#"
foo = [ { blup = 42 }]
"#,
["foo.[0].bar"],
);
}
}

View File

@@ -89,7 +89,7 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use once_cell::sync::Lazy;
use tracing::warn;
@@ -566,6 +566,34 @@ impl RequestContext {
}
}
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
if duration == Duration::ZERO {
return;
}
match &self.scope {
Scope::Timeline { arc_arc } => arc_arc
.wait_ondemand_download_time
.observe(self.task_kind, duration),
_ => {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::time::Duration;
use utils::rate_limit::RateLimit;
static LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut guard = LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
warn!(
%rate_limit_stats,
backtrace=%std::backtrace::Backtrace::force_capture(),
"ondemand downloads should always happen within timeline scope",
);
});
}
}
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());

View File

@@ -212,6 +212,12 @@ paths:
schema:
type: string
format: date-time
"412":
description: No timestamp is found for given LSN, e.g. if there had been no commits till LSN
content:
application/json:
schema:
$ref: "#/components/schemas/PreconditionFailedError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:

View File

@@ -67,7 +67,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::remote_timeline_client::index::GcCompactionState;
use crate::tenant::remote_timeline_client::{
download_index_part, list_remote_tenant_shards, list_remote_timelines,
download_index_part, download_tenant_manifest, list_remote_tenant_shards, list_remote_timelines,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
@@ -989,7 +989,7 @@ async fn get_lsn_by_timestamp_handler(
if !tenant_shard_id.is_shard_zero() {
// Requires SLRU contents, which are only stored on shard zero
return Err(ApiError::BadRequest(anyhow!(
"Size calculations are only available on shard zero"
"Lsn calculations by timestamp are only available on shard zero"
)));
}
@@ -1064,7 +1064,7 @@ async fn get_timestamp_of_lsn_handler(
if !tenant_shard_id.is_shard_zero() {
// Requires SLRU contents, which are only stored on shard zero
return Err(ApiError::BadRequest(anyhow!(
"Size calculations are only available on shard zero"
"Timestamp calculations by lsn are only available on shard zero"
)));
}
@@ -1090,8 +1090,8 @@ async fn get_timestamp_of_lsn_handler(
.to_string();
json_response(StatusCode::OK, time)
}
None => Err(ApiError::NotFound(
anyhow::anyhow!("Timestamp for lsn {} not found", lsn).into(),
None => Err(ApiError::PreconditionFailed(
format!("Timestamp for lsn {} not found", lsn).into(),
)),
}
}
@@ -2274,6 +2274,7 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? {
flags |= CompactFlags::DryRun;
}
// Manual compaction does not yield for L0.
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
@@ -2911,9 +2912,22 @@ async fn tenant_scan_remote_handler(
};
}
let result =
download_tenant_manifest(&state.remote_storage, &tenant_shard_id, generation, &cancel)
.instrument(info_span!("download_tenant_manifest",
tenant_id=%tenant_shard_id.tenant_id,
shard_id=%tenant_shard_id.shard_slug()))
.await;
let stripe_size = match result {
Ok((manifest, _, _)) => manifest.stripe_size,
Err(DownloadError::NotFound) => None,
Err(err) => return Err(ApiError::InternalServerError(anyhow!(err))),
};
response.shards.push(TenantScanRemoteStorageShard {
tenant_shard_id,
generation: generation.into(),
stripe_size,
});
}
@@ -3368,11 +3382,11 @@ async fn put_tenant_timeline_import_basebackup(
let broker_client = state.broker_client.clone();
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
})
}));
let mut body = StreamReader::new(
request
.into_body()
.map(|res| res.map_err(|error| std::io::Error::other(anyhow::anyhow!(error)))),
);
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
@@ -3446,7 +3460,7 @@ async fn put_tenant_timeline_import_wal(
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
std::io::Error::other( anyhow::anyhow!(error))
})
}));

View File

@@ -1,10 +1,8 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
@@ -23,13 +21,13 @@ use pageserver_api::config::{
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use pin_project_lite::pin_project;
use postgres_backend::{QueryError, is_expected_io_error};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use utils::id::TimelineId;
use crate::config;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::pgdatadir_mapping::DatadirModificationStats;
@@ -499,6 +497,100 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) mod wait_ondemand_download_time {
use super::*;
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
];
/// The task kinds for which we want to track wait times for on-demand downloads.
/// Other task kinds' wait times are accumulated in label value `unknown`.
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
TaskKind::PageRequestHandler,
TaskKind::WalReceiverConnectionHandler,
];
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
let histo = register_histogram_vec!(
"pageserver_wait_ondemand_download_seconds_global",
"Observations are individual tasks' wait times for on-demand downloads. \
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
&["task_kind"],
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
)
.expect("failed to define a metric");
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
.collect::<Vec<_>>()
});
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
// use a name that _could_ be evolved into a per-timeline histogram later
"pageserver_wait_ondemand_download_seconds_sum",
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
)
.unwrap()
});
pub struct WaitOndemandDownloadTimeSum {
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
}
impl WaitOndemandDownloadTimeSum {
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| {
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
.get_metric_with_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
])
.unwrap()
})
.collect::<Vec<_>>();
Self {
counters: counters.try_into().unwrap(),
}
}
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.enumerate()
.find(|(_, kind)| **kind == task_kind);
let Some((idx, _)) = maybe else {
return;
};
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
let counter = &self.counters[idx];
counter.inc_by(duration.as_secs_f64());
}
}
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
]);
}
}
pub(crate) fn preinitialize_global_metrics() {
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
}
}
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2314,13 +2406,18 @@ impl RemoteOpFileKind {
}
}
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_operation_seconds",
"Time spent on remote storage operations. \
Grouped by tenant, timeline, operation_kind and status. \
"pageserver_remote_timeline_client_seconds_global",
"Time spent on remote timeline client operations. \
Grouped by task_kind, file_kind, operation_kind and status. \
The task_kind is \
- for layer downloads, populated from RequestContext (primary objective of having the label) \
- for index downloads, set to 'unknown' \
- for any upload operation, set to 'RemoteUploadTask' \
This keeps dimensionality at bay. \
Does not account for time spent waiting in remote timeline client's queues.",
&["file_kind", "op_kind", "status"]
&["task_kind", "file_kind", "op_kind", "status"]
)
.expect("failed to define a metric")
});
@@ -2882,6 +2979,7 @@ pub(crate) struct TimelineMetrics {
pub storage_io_size: StorageIoSizeMetrics,
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
pub wait_lsn_start_finish_counterpair: IntCounterPair,
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -3027,6 +3125,13 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wait_ondemand_download_time =
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
&tenant_id,
&shard_id,
&timeline_id,
);
TimelineMetrics {
tenant_id,
shard_id,
@@ -3060,6 +3165,7 @@ impl TimelineMetrics {
wal_records_received,
wait_lsn_in_progress_micros,
wait_lsn_start_finish_counterpair,
wait_ondemand_download_time,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -3252,6 +3358,8 @@ impl TimelineMetrics {
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
}
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
@@ -3373,13 +3481,18 @@ impl RemoteTimelineClientMetrics {
pub fn remote_operation_time(
&self,
task_kind: Option<TaskKind>,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
let key = (file_kind.as_str(), op_kind.as_str(), status);
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[key.0, key.1, key.2])
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
.get_metric_with_label_values(&[
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
file_kind.as_str(),
op_kind.as_str(),
status,
])
.unwrap()
}
@@ -3624,54 +3737,26 @@ impl Drop for RemoteTimelineClientMetrics {
/// Wrapper future that measures the time spent by a remote storage operation,
/// and records the time and success/failure as a prometheus metric.
pub(crate) trait MeasureRemoteOp: Sized {
fn measure_remote_op(
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
async fn measure_remote_op(
self,
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
metrics: Arc<RemoteTimelineClientMetrics>,
) -> MeasuredRemoteOp<Self> {
) -> Result<O, E> {
let start = Instant::now();
MeasuredRemoteOp {
inner: self,
file_kind,
op,
start,
metrics,
}
let res = self.await;
let duration = start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
metrics
.remote_operation_time(task_kind, &file_kind, &op, status)
.observe(duration.as_secs_f64());
res
}
}
impl<T: Sized> MeasureRemoteOp for T {}
pin_project! {
pub(crate) struct MeasuredRemoteOp<F>
{
#[pin]
inner: F,
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
start: Instant,
metrics: Arc<RemoteTimelineClientMetrics>,
}
}
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
type Output = Result<O, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let poll_result = this.inner.poll(cx);
if let Poll::Ready(ref res) = poll_result {
let duration = this.start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
this.metrics
.remote_operation_time(this.file_kind, this.op, status)
.observe(duration.as_secs_f64());
}
poll_result
}
}
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
pub mod tokio_epoll_uring {
use std::collections::HashMap;
@@ -4107,9 +4192,33 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",
"TOML items present in the on-disk configuration file but ignored by the pageserver config parser.\
The `item` label is the dot-separated path of the ignored item in the on-disk configuration file.\
The value for an unknown config item is always 1.\
There is a special label value \"\", which is 0, so that there is always a metric exposed (simplifies dashboards).",
&["item"]
)
.unwrap()
});
pub fn preinitialize_metrics(
conf: &'static PageServerConf,
ignored: config::ignored_fields::Paths,
) {
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[""])
.set(0);
for path in &ignored.paths {
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[path])
.set(1);
}
// Python tests need these and on some we do alerting.
//
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
@@ -4195,4 +4304,5 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
tenant_throttling::preinitialize_global_metrics();
wait_ondemand_download_time::preinitialize_global_metrics();
}

View File

@@ -247,6 +247,15 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
/// Perf root spans start at the per-request level, after shard routing.
/// This struct carries connection-level information to the root perf span definition.
#[derive(Clone)]
struct ConnectionPerfSpanFields {
peer_addr: String,
application_name: Option<String>,
compute_mode: Option<String>,
}
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
@@ -271,6 +280,12 @@ async fn page_service_conn_main(
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr().context("get peer address")?;
let perf_span_fields = ConnectionPerfSpanFields {
peer_addr: peer_addr.to_string(),
application_name: None, // filled in later
compute_mode: None, // filled in later
};
tracing::Span::current().record("peer_addr", field::display(peer_addr));
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
@@ -314,6 +329,7 @@ async fn page_service_conn_main(
tenant_manager,
auth,
pipelining_config,
perf_span_fields,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -358,6 +374,8 @@ struct PageServerHandler {
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
perf_span_fields: ConnectionPerfSpanFields,
cancel: CancellationToken,
/// None only while pagestream protocol is being processed.
@@ -703,11 +721,13 @@ impl BatchedFeMessage {
}
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -717,6 +737,7 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
perf_span_fields,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
@@ -754,6 +775,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
conn_perf_span_fields: &ConnectionPerfSpanFields,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -952,6 +974,9 @@ impl PageServerHandler {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
peer_addr = conn_perf_span_fields.peer_addr,
application_name = conn_perf_span_fields.application_name,
compute_mode = conn_perf_span_fields.compute_mode,
tenant_id = %tenant_id,
shard_id = %shard.get_shard_identity().shard_slug(),
timeline_id = %timeline_id,
@@ -1581,6 +1606,7 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&self.perf_span_fields,
&cancel,
ctx,
protocol_version,
@@ -1714,6 +1740,8 @@ impl PageServerHandler {
// Batcher
//
let perf_span_fields = self.perf_span_fields.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1727,6 +1755,7 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&perf_span_fields,
&cancel_batcher,
&ctx,
protocol_version,
@@ -2669,12 +2698,14 @@ where
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
self.perf_span_fields.application_name = Some(app_name.to_string());
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
self.perf_span_fields.compute_mode = Some(value.clone());
Span::current().record("compute_mode", field::display(value));
}
}

View File

@@ -691,7 +691,7 @@ impl Timeline {
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
/// Does the slru segment exist?
pub(crate) async fn get_slru_segment_exists(
&self,
kind: SlruKind,
@@ -844,9 +844,9 @@ impl Timeline {
.await
}
/// Obtain the possible timestamp range for the given lsn.
/// Obtain the timestamp for the given lsn.
///
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
/// If the lsn has no timestamps (e.g. no commits), returns None.
pub(crate) async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,

View File

@@ -45,6 +45,7 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::{
FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD, UploadQueueNotReadyError,
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
@@ -99,7 +100,7 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walingest::WalLagCooldown;
use crate::walredo::PostgresRedoManager;
use crate::walredo::{PostgresRedoManager, RedoAttemptType};
use crate::{InitializationOrder, TEMP_FILE_SUFFIX, import_datadir, span, task_mgr, walredo};
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
@@ -226,7 +227,8 @@ struct TimelinePreload {
}
pub(crate) struct TenantPreload {
tenant_manifest: TenantManifest,
/// The tenant manifest from remote storage, or None if no manifest was found.
tenant_manifest: Option<TenantManifest>,
/// Map from timeline ID to a possible timeline preload. It is None iff the timeline is offloaded according to the manifest.
timelines: HashMap<TimelineId, Option<TimelinePreload>>,
}
@@ -282,12 +284,15 @@ pub struct Tenant {
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
/// Serialize writes of the tenant manifest to remote storage. If there are concurrent operations
/// affecting the manifest, such as timeline deletion and timeline offload, they must wait for
/// each other (this could be optimized to coalesce writes if necessary).
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
///
/// The contents of the Mutex are the last manifest we successfully uploaded
tenant_manifest_upload: tokio::sync::Mutex<Option<TenantManifest>>,
/// Initially populated during tenant attach, updated via `maybe_upload_tenant_manifest`.
///
/// Do not modify this directly. It is used to check whether a new manifest needs to be
/// uploaded. The manifest is constructed in `build_tenant_manifest`, and uploaded via
/// `maybe_upload_tenant_manifest`.
remote_tenant_manifest: tokio::sync::Mutex<Option<TenantManifest>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
@@ -468,15 +473,16 @@ impl WalRedoManager {
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<bytes::Bytes, walredo::Error> {
match self {
Self::Prod(_, mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
.await
}
#[cfg(test)]
Self::Test(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
.await
}
}
@@ -915,6 +921,7 @@ enum StartCreatingTimelineResult {
Idempotent(Arc<Timeline>),
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum TimelineInitAndSyncResult {
ReadyToActivate(Arc<Timeline>),
NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata),
@@ -1001,6 +1008,7 @@ enum CreateTimelineCause {
Delete,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum LoadTimelineCause {
Attach,
Unoffload,
@@ -1354,36 +1362,41 @@ impl Tenant {
}
}
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
enum BrokenVerbosity {
Error,
Info
}
let make_broken =
|t: &Tenant, err: anyhow::Error, verbosity: BrokenVerbosity| {
match verbosity {
BrokenVerbosity::Info => {
info!("attach cancelled, setting tenant state to Broken: {err}");
},
BrokenVerbosity::Error => {
error!("attach failed, setting tenant state to Broken: {err:?}");
}
fn make_broken_or_stopping(t: &Tenant, err: anyhow::Error) {
t.state.send_modify(|state| match state {
// TODO: the old code alluded to DeleteTenantFlow sometimes setting
// TenantState::Stopping before we get here, but this may be outdated.
// Let's find out with a testing assertion. If this doesn't fire, and the
// logs don't show this happening in production, remove the Stopping cases.
TenantState::Stopping{..} if cfg!(any(test, feature = "testing")) => {
panic!("unexpected TenantState::Stopping during attach")
}
t.state.send_modify(|state| {
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
// if it errors, we will call make_broken when tenant is already in Stopping.
assert!(
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
};
// If the tenant is cancelled, assume the error was caused by cancellation.
TenantState::Attaching if t.cancel.is_cancelled() => {
info!("attach cancelled, setting tenant state to Stopping: {err}");
// NB: progress None tells `set_stopping` that attach has cancelled.
*state = TenantState::Stopping { progress: None };
}
// According to the old code, DeleteTenantFlow may already have set this to
// Stopping. Retain its progress.
// TODO: there is no DeleteTenantFlow. Is this still needed? See above.
TenantState::Stopping { progress } if t.cancel.is_cancelled() => {
assert!(progress.is_some(), "concurrent attach cancellation");
info!("attach cancelled, already Stopping: {err}");
}
// Mark the tenant as broken.
TenantState::Attaching | TenantState::Stopping { .. } => {
error!("attach failed, setting tenant state to Broken (was {state}): {err:?}");
*state = TenantState::broken_from_reason(err.to_string())
}
// The attach task owns the tenant state until activated.
state => panic!("invalid tenant state {state} during attach: {err:?}"),
});
}
// TODO: should also be rejecting tenant conf changes that violate this check.
if let Err(e) = crate::tenant::storage_layer::inmemory_layer::IndexEntry::validate_checkpoint_distance(tenant_clone.get_checkpoint_distance()) {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
return Ok(());
}
@@ -1435,10 +1448,8 @@ impl Tenant {
// stayed in Activating for such a long time that shutdown found it in
// that state.
tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
// Make the tenant broken so that set_stopping will not hang waiting for it to leave
// the Attaching state. This is an over-reaction (nothing really broke, the tenant is
// just shutting down), but ensures progress.
make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"), BrokenVerbosity::Info);
// Set the tenant to Stopping to signal `set_stopping` that we're done.
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
return Ok(());
},
)
@@ -1457,7 +1468,7 @@ impl Tenant {
match res {
Ok(p) => Some(p),
Err(e) => {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
return Ok(());
}
}
@@ -1483,9 +1494,7 @@ impl Tenant {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
}
Err(e) => make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e)),
}
// If we are doing an opportunistic warmup attachment at startup, initialize
@@ -1525,28 +1534,27 @@ impl Tenant {
cancel.clone(),
)
.await?;
let (offloaded_add, tenant_manifest) =
match remote_timeline_client::download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _generation, _manifest_mtime)) => (
format!("{} offloaded", tenant_manifest.offloaded_timelines.len()),
tenant_manifest,
),
Err(DownloadError::NotFound) => {
("no manifest".to_string(), TenantManifest::empty())
}
Err(e) => Err(e)?,
};
let tenant_manifest = match download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _, _)) => Some(tenant_manifest),
Err(DownloadError::NotFound) => None,
Err(err) => return Err(err.into()),
};
info!(
"found {} timelines, and {offloaded_add}",
remote_timeline_ids.len()
"found {} timelines ({} offloaded timelines)",
remote_timeline_ids.len(),
tenant_manifest
.as_ref()
.map(|m| m.offloaded_timelines.len())
.unwrap_or(0)
);
for k in other_keys {
@@ -1555,11 +1563,13 @@ impl Tenant {
// Avoid downloading IndexPart of offloaded timelines.
let mut offloaded_with_prefix = HashSet::new();
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
if let Some(tenant_manifest) = &tenant_manifest {
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
}
}
}
@@ -1633,12 +1643,14 @@ impl Tenant {
let mut offloaded_timeline_ids = HashSet::new();
let mut offloaded_timelines_list = Vec::new();
for timeline_manifest in preload.tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
if let Some(tenant_manifest) = &preload.tenant_manifest {
for timeline_manifest in tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
}
}
// Complete deletions for offloaded timeline id's from manifest.
// The manifest will be uploaded later in this function.
@@ -1796,15 +1808,21 @@ impl Tenant {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
let needs_manifest_upload =
offloaded_timelines_list.len() != preload.tenant_manifest.offloaded_timelines.len();
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
if needs_manifest_upload {
self.store_tenant_manifest().await?;
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
// NB: this must happen after the tenant is fully populated above. In particular the
// offloaded timelines, which are included in the manifest.
{
let mut guard = self.remote_tenant_manifest.lock().await;
assert!(guard.is_none(), "tenant manifest set before preload"); // first populated here
*guard = preload.tenant_manifest;
}
self.maybe_upload_tenant_manifest().await?;
// The local filesystem contents are a cache of what's in the remote IndexPart;
// IndexPart is the source of truth.
@@ -2218,7 +2236,7 @@ impl Tenant {
};
// Upload new list of offloaded timelines to S3
self.store_tenant_manifest().await?;
self.maybe_upload_tenant_manifest().await?;
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
@@ -3429,7 +3447,7 @@ impl Tenant {
shutdown_mode
};
match self.set_stopping(shutdown_progress, false, false).await {
match self.set_stopping(shutdown_progress).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
@@ -3509,25 +3527,13 @@ impl Tenant {
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
///
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
async fn set_stopping(
&self,
progress: completion::Barrier,
_allow_transition_from_loading: bool,
allow_transition_from_attaching: bool,
) -> Result<(), SetStoppingError> {
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Attaching if allow_transition_from_attaching => true,
TenantState::Activating(_) | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
info!("waiting for {state} to turn Active|Broken|Stopping");
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
@@ -3538,25 +3544,24 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) => {
unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Attaching => {
if !allow_transition_from_attaching {
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
TenantState::Activating(_) | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping { progress };
*current_state = TenantState::Stopping { progress: Some(progress) };
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Stopping { progress: None } => {
// An attach was cancelled, and the attach transitioned the tenant from Attaching to
// Stopping(None) to let us know it exited. Register our progress and continue.
*current_state = TenantState::Stopping { progress: Some(progress) };
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
@@ -3564,7 +3569,7 @@ impl Tenant {
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping { progress } => {
TenantState::Stopping { progress: Some(progress) } => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
false
@@ -4065,18 +4070,20 @@ impl Tenant {
/// Generate an up-to-date TenantManifest based on the state of this Tenant.
fn build_tenant_manifest(&self) -> TenantManifest {
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
let mut timeline_manifests = timelines_offloaded
.iter()
.map(|(_timeline_id, offloaded)| offloaded.manifest())
.collect::<Vec<_>>();
// Sort the manifests so that our output is deterministic
timeline_manifests.sort_by_key(|timeline_manifest| timeline_manifest.timeline_id);
// Collect the offloaded timelines, and sort them for deterministic output.
let offloaded_timelines = self
.timelines_offloaded
.lock()
.unwrap()
.values()
.map(|tli| tli.manifest())
.sorted_by_key(|m| m.timeline_id)
.collect_vec();
TenantManifest {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: timeline_manifests,
stripe_size: Some(self.get_shard_stripe_size()),
offloaded_timelines,
}
}
@@ -4299,7 +4306,7 @@ impl Tenant {
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
timelines_offloaded: Mutex::new(HashMap::new()),
tenant_manifest_upload: Default::default(),
remote_tenant_manifest: Default::default(),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
@@ -4395,10 +4402,7 @@ impl Tenant {
.to_string();
fail::fail_point!("tenant-config-before-write", |_| {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"tenant-config-before-write",
))
Err(std::io::Error::other("tenant-config-before-write"))
});
// Convert the config to a toml file.
@@ -5532,27 +5536,35 @@ impl Tenant {
.unwrap_or(0)
}
/// Serialize and write the latest TenantManifest to remote storage.
pub(crate) async fn store_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Only one manifest write may be done at at time, and the contents of the manifest
// must be loaded while holding this lock. This makes it safe to call this function
// from anywhere without worrying about colliding updates.
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`.
///
/// TODO: instead of requiring callers to remember to call `maybe_upload_tenant_manifest` after
/// changing any `Tenant` state that's included in the manifest, consider making the manifest
/// the authoritative source of data with an API that automatically uploads on changes. Revisit
/// this when the manifest is more widely used and we have a better idea of the data model.
pub(crate) async fn maybe_upload_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Multiple tasks may call this function concurrently after mutating the Tenant runtime
// state, affecting the manifest generated by `build_tenant_manifest`. We use an async mutex
// to serialize these callers. `eq_ignoring_version` acts as a slightly inefficient but
// simple coalescing mechanism.
let mut guard = tokio::select! {
g = self.tenant_manifest_upload.lock() => {
g
},
_ = self.cancel.cancelled() => {
return Err(TenantManifestError::Cancelled);
}
guard = self.remote_tenant_manifest.lock() => guard,
_ = self.cancel.cancelled() => return Err(TenantManifestError::Cancelled),
};
// Build a new manifest.
let manifest = self.build_tenant_manifest();
if Some(&manifest) == (*guard).as_ref() {
// Optimisation: skip uploads that don't change anything.
return Ok(());
// Check if the manifest has changed. We ignore the version number here, to avoid
// uploading every manifest on version number bumps.
if let Some(old) = guard.as_ref() {
if manifest.eq_ignoring_version(old) {
return Ok(());
}
}
// Remote storage does no retries internally, so wrap it
// Upload the manifest. Remote storage does no retries internally, so retry here.
match backoff::retry(
|| async {
upload_tenant_manifest(
@@ -5564,7 +5576,7 @@ impl Tenant {
)
.await
},
|_e| self.cancel.is_cancelled(),
|_| self.cancel.is_cancelled(),
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"uploading tenant manifest",
@@ -5868,6 +5880,7 @@ pub(crate) mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
_redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, walredo::Error> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
@@ -8722,6 +8735,21 @@ mod tests {
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("i")),
),
(
get_key(4),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "i")),
),
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("1")),
),
(
get_key(5),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "2")),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
@@ -8752,8 +8780,18 @@ mod tests {
// Need to remove the limit of "Neon WAL redo requires base image".
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
assert_eq!(
tline.get(get_key(3), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"c")
);
assert_eq!(
tline.get(get_key(4), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"ij")
);
// Manual testing required: currently, read errors will panic the process in debug mode. So we
// cannot enable this assertion in the unit test.
// assert!(tline.get(get_key(5), Lsn(0x50), &ctx).await.is_err());
Ok(())
}

View File

@@ -15,7 +15,7 @@
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use std::cmp::min;
use std::io::{Error, ErrorKind};
use std::io::Error;
use async_compression::Level;
use bytes::{BufMut, BytesMut};
@@ -331,10 +331,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
return (
(
io_buf.slice_len(),
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
Err(Error::other(format!("blob too large ({len} bytes)"))),
),
srcbuf,
);

View File

@@ -216,12 +216,8 @@ impl<'a> FileBlockReader<'a> {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
.map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
{
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer

View File

@@ -642,6 +642,7 @@ impl RemoteTimelineClient {
cancel,
)
.measure_remote_op(
Option::<TaskKind>::None,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -739,6 +740,7 @@ impl RemoteTimelineClient {
ctx,
)
.measure_remote_op(
Some(ctx.task_kind()),
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -2175,6 +2177,7 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
@@ -2191,6 +2194,7 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),

View File

@@ -1,21 +1,33 @@
use chrono::NaiveDateTime;
use pageserver_api::shard::ShardStripeSize;
use serde::{Deserialize, Serialize};
use utils::id::TimelineId;
use utils::lsn::Lsn;
/// Tenant-shard scoped manifest
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
/// Tenant shard manifest, stored in remote storage. Contains offloaded timelines and other tenant
/// shard-wide information that must be persisted in remote storage.
///
/// The manifest is always updated on tenant attach, and as needed.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TenantManifest {
/// Debugging aid describing the version of this manifest.
/// Can also be used for distinguishing breaking changes later on.
/// The manifest version. Incremented on manifest format changes, even non-breaking ones.
/// Manifests must generally always be backwards and forwards compatible for one release, to
/// allow release rollbacks.
pub version: usize,
/// This tenant's stripe size. This is only advisory, and used to recover tenant data from
/// remote storage. The autoritative source is the storage controller. If None, assume the
/// original default value of 32768 blocks (256 MB).
#[serde(skip_serializing_if = "Option::is_none")]
pub stripe_size: Option<ShardStripeSize>,
/// The list of offloaded timelines together with enough information
/// to not have to actually load them.
///
/// Note: the timelines mentioned in this list might be deleted, i.e.
/// we don't hold an invariant that the references aren't dangling.
/// Existence of index-part.json is the actual indicator of timeline existence.
#[serde(default)]
pub offloaded_timelines: Vec<OffloadedTimelineManifest>,
}
@@ -24,7 +36,7 @@ pub struct TenantManifest {
/// Very similar to [`pageserver_api::models::OffloadedTimelineInfo`],
/// but the two datastructures serve different needs, this is for a persistent disk format
/// that must be backwards compatible, while the other is only for informative purposes.
#[derive(Clone, Serialize, Deserialize, Copy, PartialEq, Eq)]
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Eq)]
pub struct OffloadedTimelineManifest {
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
@@ -35,20 +47,166 @@ pub struct OffloadedTimelineManifest {
pub archived_at: NaiveDateTime,
}
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 1;
/// The newest manifest version. This should be incremented on changes, even non-breaking ones. We
/// do not use deny_unknown_fields, so new fields are not breaking.
///
/// 1: initial version
/// 2: +stripe_size
///
/// When adding new versions, also add a parse_vX test case below.
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 2;
impl TenantManifest {
pub(crate) fn empty() -> Self {
Self {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: vec![],
/// Returns true if the manifests are equal, ignoring the version number. This avoids
/// re-uploading all manifests just because the version number is bumped.
pub fn eq_ignoring_version(&self, other: &Self) -> bool {
// Fast path: if the version is equal, just compare directly.
if self.version == other.version {
return self == other;
}
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<Self>(bytes)
// We could alternatively just clone and modify the version here.
let Self {
version: _, // ignore version
stripe_size,
offloaded_timelines,
} = self;
stripe_size == &other.stripe_size && offloaded_timelines == &other.offloaded_timelines
}
pub(crate) fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
/// Decodes a manifest from JSON.
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
/// Encodes a manifest as JSON.
pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use utils::id::TimelineId;
use super::*;
/// Empty manifests should be parsed. Version is required.
#[test]
fn parse_empty() -> anyhow::Result<()> {
let json = r#"{
"version": 0
}"#;
let expected = TenantManifest {
version: 0,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// Unknown fields should be ignored, for forwards compatibility.
#[test]
fn parse_unknown_fields() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"foo": "bar"
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v1 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v1() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v2 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v2() -> anyhow::Result<()> {
let json = r#"{
"version": 2,
"stripe_size": 32768,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 2,
stripe_size: Some(ShardStripeSize(32768)),
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
}

View File

@@ -61,6 +61,7 @@ pub(crate) async fn upload_index_part(
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
}
/// Serializes and uploads the given tenant manifest data to the remote storage.
pub(crate) async fn upload_tenant_manifest(
storage: &GenericRemoteStorage,
@@ -76,16 +77,14 @@ pub(crate) async fn upload_tenant_manifest(
});
pausable_failpoint!("before-upload-manifest-pausable");
let serialized = tenant_manifest.to_json_bytes()?;
let serialized = Bytes::from(serialized);
let tenant_manifest_site = serialized.len();
let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
let tenant_manifest_size = serialized.len();
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
storage
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(serialized))),
tenant_manifest_site,
tenant_manifest_size,
&remote_path,
cancel,
)

View File

@@ -366,7 +366,7 @@ impl SplitDeltaLayerWriter {
)
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
prev_delta_writer,
start_key..key,

View File

@@ -766,7 +766,7 @@ mod tests {
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
Ok((dst, len))
}
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
Err(e) => Err(std::io::Error::other(e)),
}
}
}

View File

@@ -975,6 +975,10 @@ impl LayerInner {
allow_download: bool,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut wait_for_download_recorder =
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
ctx.ondemand_download_wait_observe(accum.get());
});
let (weak, permit) = {
// get_or_init_detached can:
// - be fast (mutex lock) OR uncontested semaphore permit acquire
@@ -983,7 +987,7 @@ impl LayerInner {
let locked = self
.inner
.get_or_init_detached()
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
.await
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
@@ -1013,6 +1017,7 @@ impl LayerInner {
Err(permit) => (None, permit),
}
};
let _guard = wait_for_download_recorder.guard();
if let Some(weak) = weak {
// only drop the weak after dropping the heavier_once_cell guard
@@ -1202,6 +1207,7 @@ impl LayerInner {
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
let start = std::time::Instant::now();
let result = timeline
.remote_client
.download_layer_file(
@@ -1213,7 +1219,8 @@ impl LayerInner {
ctx,
)
.await;
let latency = start.elapsed();
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
match result {
Ok(size) => {
assert_eq!(size, self.desc.file_size);
@@ -1229,9 +1236,8 @@ impl LayerInner {
Err(e) => {
panic!("post-condition failed: needs_download errored: {e:?}");
}
}
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
};
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
timeline
.metrics
.resident_physical_size_add(self.desc.file_size);
@@ -1260,7 +1266,7 @@ impl LayerInner {
return Err(e);
}
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,

View File

@@ -59,6 +59,7 @@ impl LayerIterRef<'_> {
/// 1. Unified iterator for image and delta layers.
/// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
/// 3. Lazy creation of the real delta/image iterator.
#[allow(clippy::large_enum_variant, reason = "TODO")]
pub(crate) enum IteratorWrapper<'a> {
NotLoaded {
ctx: &'a RequestContext,

View File

@@ -268,7 +268,12 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run += 1;
let backoff =
exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
log_compaction_error(&err, Some((error_run, backoff)), cancel.is_cancelled());
log_compaction_error(
&err,
Some((error_run, backoff)),
cancel.is_cancelled(),
false,
);
continue;
}
}
@@ -285,6 +290,7 @@ pub(crate) fn log_compaction_error(
err: &CompactionError,
retry_info: Option<(u32, Duration)>,
task_cancelled: bool,
degrade_to_warning: bool,
) {
use CompactionError::*;
@@ -333,6 +339,7 @@ pub(crate) fn log_compaction_error(
}
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),

View File

@@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -115,7 +116,7 @@ use crate::pgdatadir_mapping::{
use crate::task_mgr::TaskKind;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::{LayerMap, SearchResult};
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
@@ -1039,6 +1040,7 @@ pub(crate) enum ShutdownMode {
Hard,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum ImageLayerCreationOutcome {
/// We generated an image layer
Generated {
@@ -1292,6 +1294,12 @@ impl Timeline {
};
reconstruct_state.read_path = read_path;
let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction {
RedoAttemptType::LegacyCompaction
} else {
RedoAttemptType::ReadPage
};
let traversal_res: Result<(), _> = {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
@@ -1379,7 +1387,7 @@ impl Timeline {
let walredo_deltas = converted.num_deltas();
let walredo_res = walredo_self
.reconstruct_value(key, lsn, converted)
.reconstruct_value(key, lsn, converted, redo_attempt_type)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
@@ -1940,7 +1948,7 @@ impl Timeline {
)
.await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled());
log_compaction_error(err, None, cancel.is_cancelled(), false);
}
res
}
@@ -4104,12 +4112,6 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
// Prevent GC from progressing while visiting the current timeline.
// If we are GC-ing because a new image layer was added while traversing
// the timeline, then it will remove layers that are required for fulfilling
@@ -4120,11 +4122,44 @@ impl Timeline {
// See `compaction::compact_with_gc` for why we need this.
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
loop {
// Initialize the fringe
let mut fringe = {
let mut fringe = LayerFringe::new();
let guard = timeline.layers.read().await;
guard.update_search_fringe(&keyspace, cont_lsn, &mut fringe)?;
fringe
};
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
if let Some(ref mut read_path) = reconstruct_state.read_path {
read_path.record_layer_visit(&layer_to_read, &keyspace_to_read, &lsn_range);
}
// Visit the layer and plan IOs for it
let next_cont_lsn = lsn_range.start;
layer_to_read
.get_values_reconstruct_data(
keyspace_to_read.clone(),
lsn_range,
reconstruct_state,
ctx,
)
.await?;
let mut unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
let (keys_done_last_step, keys_with_image_coverage) =
reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
@@ -4135,31 +4170,15 @@ impl Timeline {
image_covered_keyspace.add_range(keys_with_image_coverage);
}
// Query the layer map for the next layers to read.
//
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map()?;
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
guard.upgrade(layer),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
}
guard.update_search_fringe(&unmapped_keyspace, cont_lsn, &mut fringe)?;
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
@@ -4173,28 +4192,6 @@ impl Timeline {
// at two different time points.
drop(guard);
}
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if let Some(ref mut read_path) = reconstruct_state.read_path {
read_path.record_layer_visit(&layer_to_read, &keyspace_to_read, &lsn_range);
}
let next_cont_lsn = lsn_range.start;
layer_to_read
.get_values_reconstruct_data(
keyspace_to_read.clone(),
lsn_range,
reconstruct_state,
ctx,
)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
} else {
break;
}
}
Ok(TimelineVisitOutcome {
@@ -6357,10 +6354,17 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
let fire_critical_error = match redo_attempt_type {
RedoAttemptType::ReadPage => true,
RedoAttemptType::LegacyCompaction => true,
RedoAttemptType::GcCompaction => false,
};
// If we have a page image, and no WAL, we're all set
if data.records.is_empty() {
if let Some((img_lsn, img)) = &data.img {
@@ -6407,13 +6411,22 @@ impl Timeline {
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.request_redo(
key,
request_lsn,
data.img,
data.records,
self.pg_version,
redo_attempt_type,
)
.await;
let img = match res {
Ok(img) => img,
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(err)) => {
critical!("walredo failure during page reconstruction: {err:?}");
if fire_critical_error {
critical!("walredo failure during page reconstruction: {err:?}");
}
return Err(PageReconstructError::WalRedo(
err.context("reconstruct a page image"),
));

View File

@@ -7,7 +7,7 @@
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use super::layer_manager::LayerManager;
use super::{
@@ -16,6 +16,8 @@ use super::{
Timeline,
};
use crate::tenant::timeline::DeltaEntry;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, anyhow};
use bytes::Bytes;
use enumset::EnumSet;
@@ -315,6 +317,9 @@ impl GcCompactionQueue {
flags: {
let mut flags = EnumSet::new();
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
if timeline.get_compaction_l0_first() {
flags |= CompactFlags::YieldForL0;
}
flags
},
sub_compaction: true,
@@ -448,7 +453,7 @@ impl GcCompactionQueue {
) -> Result<CompactionOutcome, CompactionError> {
let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled());
log_compaction_error(err, None, cancel.is_cancelled(), true);
}
match res {
Ok(res) => Ok(res),
@@ -819,15 +824,16 @@ pub struct CompactionStatistics {
time_acquire_lock_secs: f64,
time_analyze_secs: f64,
time_download_layer_secs: f64,
time_to_first_kv_pair_secs: f64,
time_main_loop_secs: f64,
time_final_phase_secs: f64,
time_total_secs: f64,
// Summary
/// Ratio of the key-value size before/after gc-compaction.
uncompressed_size_ratio: f64,
/// Ratio of the physical size before/after gc-compaction.
physical_size_ratio: f64,
/// Ratio of the key-value size after/before gc-compaction.
uncompressed_retention_ratio: f64,
/// Ratio of the physical size after/before gc-compaction.
compressed_retention_ratio: f64,
}
impl CompactionStatistics {
@@ -896,15 +902,15 @@ impl CompactionStatistics {
fn finalize(&mut self) {
let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
self.uncompressed_size_ratio =
original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0
self.uncompressed_retention_ratio =
produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
let produced_physical_size = self.image_layer_produced.size
+ self.delta_layer_produced.size
+ self.image_layer_discarded.size
+ self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
self.physical_size_ratio =
original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0
self.compressed_retention_ratio =
produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
}
}
@@ -1244,6 +1250,10 @@ impl Timeline {
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
@@ -2406,7 +2416,9 @@ impl Timeline {
} else {
lsn_split_points[i]
};
let img = self.reconstruct_value(key, request_lsn, state).await?;
let img = self
.reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
.await?;
Some((request_lsn, img))
} else {
None
@@ -3026,7 +3038,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
let time_download_layer = timer.elapsed();
let timer = Instant::now();
let mut timer = Instant::now();
// Step 2: Produce images+deltas.
let mut accumulated_values = Vec::new();
@@ -3101,8 +3113,7 @@ impl Timeline {
// Actually, we can decide not to write to the image layer at all at this point because
// the key and LSN range are determined. However, to keep things simple here, we still
// create this writer, and discard the writer in the end.
let mut keys_processed = 0;
let mut time_to_first_kv_pair = None;
while let Some(((key, lsn, val), desc)) = merge_iter
.next_with_trace()
@@ -3110,13 +3121,16 @@ impl Timeline {
.context("failed to get next key-value pair")
.map_err(CompactionError::Other)?
{
if time_to_first_kv_pair.is_none() {
time_to_first_kv_pair = Some(timer.elapsed());
timer = Instant::now();
}
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()
@@ -3447,6 +3461,9 @@ impl Timeline {
let time_final_phase = timer.elapsed();
stat.time_final_phase_secs = time_final_phase.as_secs_f64();
stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
.unwrap_or(Duration::ZERO)
.as_secs_f64();
stat.time_main_loop_secs = time_main_loop.as_secs_f64();
stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
stat.time_download_layer_secs = time_download_layer.as_secs_f64();
@@ -3907,8 +3924,6 @@ impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
}
}
use crate::tenant::timeline::DeltaEntry;
impl CompactionLayer<Key> for ResidentDeltaLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range

View File

@@ -410,10 +410,13 @@ impl DeleteTimelineFlow {
// So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted.
// However, we handle this case in tenant loading code so the next time we attach, the issue is
// resolved.
tenant.store_tenant_manifest().await.map_err(|e| match e {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
_ => DeleteTimelineError::Other(e.into()),
})?;
tenant
.maybe_upload_tenant_manifest()
.await
.map_err(|err| match err {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
err => DeleteTimelineError::Other(err.into()),
})?;
*guard = Self::Finished;

View File

@@ -3,17 +3,18 @@ use std::sync::Arc;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::TenantShardId;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use utils::id::TimelineId;
use utils::lsn::{AtomicLsn, Lsn};
use super::{ReadableLayer, TimelineWriterState};
use super::{LayerFringe, ReadableLayer, TimelineWriterState};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::metrics::TimelineMetrics;
use crate::tenant::layer_map::{BatchedUpdates, LayerMap};
use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
use crate::tenant::storage_layer::{
AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
@@ -38,7 +39,7 @@ impl Default for LayerManager {
}
impl LayerManager {
pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
match weak {
ReadableLayerWeak::PersistentLayer(desc) => {
ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
@@ -147,6 +148,36 @@ impl LayerManager {
self.layers().keys().cloned().collect_vec()
}
/// Update the [`LayerFringe`] of a read request
///
/// Take a key space at a given LSN and query the layer map below each range
/// of the key space to find the next layers to visit.
pub(crate) fn update_search_fringe(
&self,
keyspace: &KeySpace,
cont_lsn: Lsn,
fringe: &mut LayerFringe,
) -> Result<(), Shutdown> {
let map = self.layer_map()?;
for range in keyspace.ranges.iter() {
let results = map.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
self.upgrade(layer),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
}
Ok(())
}
fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
use LayerManager::*;
match self {

View File

@@ -111,7 +111,7 @@ pub(crate) async fn offload_timeline(
// at the next restart attach it again.
// For that to happen, we'd need to make the manifest reflect our *intended* state,
// not our actual state of offloaded timelines.
tenant.store_tenant_manifest().await?;
tenant.maybe_upload_tenant_manifest().await?;
tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})");

View File

@@ -445,7 +445,7 @@ pub(super) async fn handle_walreceiver_connection(
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() {
if !cancellation.is_cancelled() && !timeline.is_stopping() {
critical!("{err:?}")
}
})?;
@@ -577,7 +577,7 @@ pub(super) async fn handle_walreceiver_connection(
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() {
if !cancellation.is_cancelled() && !timeline.is_stopping() {
critical!("{err:?}")
}
})?;

View File

@@ -302,6 +302,7 @@ pub struct UploadQueueStoppedDeletable {
pub(super) deleted_at: SetDeletedFlagProgress,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
pub enum UploadQueueStopped {
Deletable(UploadQueueStoppedDeletable),
Uninitialized,

View File

@@ -136,6 +136,16 @@ macro_rules! bail {
}
}
#[derive(Debug, Clone, Copy)]
pub enum RedoAttemptType {
/// Used for the read path. Will fire critical errors and retry twice if failure.
ReadPage,
// Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
LegacyCompaction,
// Used for gc compaction. Will not fire critical errors and not retry.
GcCompaction,
}
///
/// Public interface of WAL redo manager
///
@@ -156,11 +166,18 @@ impl PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, Error> {
if records.is_empty() {
bail!("invalid WAL redo request with no records");
}
let max_retry_attempts = match redo_attempt_type {
RedoAttemptType::ReadPage => 2,
RedoAttemptType::LegacyCompaction => 1,
RedoAttemptType::GcCompaction => 0,
};
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
let mut img = base_img.map(|p| p.1);
let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
@@ -180,6 +197,7 @@ impl PostgresRedoManager {
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
)
.await
};
@@ -201,6 +219,7 @@ impl PostgresRedoManager {
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
)
.await
}
@@ -424,11 +443,11 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
max_retry_attempts: u32,
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
let base_img = &base_img;
@@ -486,7 +505,7 @@ impl PostgresRedoManager {
info!(n_attempts, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
if n_attempts > max_retry_attempts || result.is_ok() {
return result;
}
}
@@ -560,6 +579,7 @@ mod tests {
use super::PostgresRedoManager;
use crate::config::PageServerConf;
use crate::walredo::RedoAttemptType;
#[tokio::test]
async fn test_ping() {
@@ -593,6 +613,7 @@ mod tests {
None,
short_records(),
14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await
@@ -621,6 +642,7 @@ mod tests {
None,
short_records(),
14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await
@@ -642,6 +664,7 @@ mod tests {
None,
short_records(),
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await

View File

@@ -276,6 +276,7 @@ pub(crate) fn apply_in_neon(
append,
clear,
will_init,
only_if,
} => {
use bytes::BufMut;
if *will_init {
@@ -288,6 +289,13 @@ pub(crate) fn apply_in_neon(
if *clear {
page.clear();
}
if let Some(only_if) = only_if {
if page != only_if.as_bytes() {
return Err(anyhow::anyhow!(
"the current image does not match the expected image, cannot append"
));
}
}
page.put_slice(append.as_bytes());
}
}

View File

@@ -4,6 +4,7 @@
MODULE_big = neon
OBJS = \
$(WIN32RES) \
communicator.o \
extension_server.o \
file_cache.o \
hll.o \

View File

@@ -9,4 +9,4 @@
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
#endif //NEON_BITMAP_H
#endif /* NEON_BITMAP_H */

2504
pgxn/neon/communicator.c Normal file

File diff suppressed because it is too large Load Diff

48
pgxn/neon/communicator.h Normal file
View File

@@ -0,0 +1,48 @@
/*-------------------------------------------------------------------------
*
* communicator.h
* internal interface for communicating with remote pageservers
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef COMMUNICATOR_h
#define COMMUNICATOR_h
#include "neon_pgversioncompat.h"
#include "storage/buf_internals.h"
#include "pagestore_client.h"
/* initialization at postmaster startup */
extern void pg_init_communicator(void);
/* initialization at backend startup */
extern void communicator_init(void);
extern bool communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum,
neon_request_lsns *request_lsns);
extern BlockNumber communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum,
neon_request_lsns *request_lsns);
extern int64 communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns);
extern void communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber base_blockno, neon_request_lsns *request_lsns,
void **buffers, BlockNumber nblocks, const bits8 *mask);
extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum,
neon_request_lsns *lsns,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
#endif

View File

@@ -13,9 +13,6 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/

View File

@@ -3,9 +3,6 @@
* extension_server.c
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

View File

@@ -3,9 +3,6 @@
* extension_server.h
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.h
*
*-------------------------------------------------------------------------
*/

View File

@@ -1,4 +1,4 @@
/*
/*-------------------------------------------------------------------------
*
* file_cache.c
*
@@ -6,10 +6,6 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* pgxn/neon/file_cache.c
*
*-------------------------------------------------------------------------
*/
@@ -25,7 +21,6 @@
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
@@ -47,6 +42,7 @@
#include "hll.h"
#include "bitmap.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
@@ -1567,8 +1563,12 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
}
}
@@ -1596,16 +1596,19 @@ local_cache_pages(PG_FUNCTION_ARGS)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
if (GET_STATE(entry, i) == AVAILABLE)
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
}
}
}

52
pgxn/neon/file_cache.h Normal file
View File

@@ -0,0 +1,52 @@
/*-------------------------------------------------------------------------
*
* file_cache.h
* Local File Cache definitions
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef FILE_CACHE_h
#define FILE_CACHE_h
#include "neon_pgversioncompat.h"
/* GUCs */
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif /* FILE_CACHE_H */

Some files were not shown because too many files have changed in this diff Show More