Compare commits

..

136 Commits

Author SHA1 Message Date
Conrad Ludgate
85072b715f Merge pull request #11106 from neondatabase/rc/release-proxy/2025-03-06
Proxy release 2025-03-06
2025-03-06 09:53:00 +00:00
github-actions[bot]
6c86fe7143 Proxy release 2025-03-06 2025-03-06 06:02:15 +00:00
JC Grünhage
66d5fe7f5b Merge pull request #11023 from neondatabase/rc/release-proxy/2025-02-27
Proxy release 2025-02-27
2025-02-27 19:10:58 +01:00
github-actions[bot]
a1b9528757 Proxy release 2025-02-27 2025-02-27 16:18:42 +00:00
Ivan Efremov
1423bb8aa2 Merge pull request #11011 from neondatabase/rc/release-proxy/2025-02-27
Proxy release 2025-02-27
2025-02-27 13:57:49 +02:00
github-actions[bot]
332f064a42 Proxy release 2025-02-27 2025-02-27 00:17:57 +00:00
Folke Behrens
c962f2b447 Merge pull request #10903 from neondatabase/rc/release-proxy/2025-02-20
Proxy release 2025-02-20
2025-02-20 10:37:47 +01:00
github-actions[bot]
446b3f9d28 Proxy release 2025-02-20 2025-02-20 06:02:01 +00:00
Conrad Ludgate
23352dc2e9 Merge pull request #10802 from neondatabase/rc/release-proxy/2025-02-13
Proxy release 2025-02-13
2025-02-13 08:41:01 +00:00
github-actions[bot]
c65fc5a955 Proxy release 2025-02-13 2025-02-13 06:02:01 +00:00
Ivan Efremov
3e624581cd Merge pull request #10691 from neondatabase/rc/release-proxy/2025-02-06
Proxy release 2025-02-06
2025-02-06 10:23:43 +02:00
github-actions[bot]
fedf4f169c Proxy release 2025-02-06 2025-02-06 06:02:11 +00:00
Folke Behrens
86d5798108 Merge pull request #10576 from neondatabase/rc/release-proxy/2025-01-30
Proxy release 2025-01-30
2025-01-30 08:52:09 +01:00
github-actions[bot]
8b4088dd8a Proxy release 2025-01-30 2025-01-30 06:02:00 +00:00
Ivan Efremov
c91905e643 Merge pull request #10416 from neondatabase/rc/release-proxy/2025-01-16
Proxy release 2025-01-16
2025-01-16 10:04:38 +02:00
github-actions[bot]
44b4e355a2 Proxy release 2025-01-16 2025-01-16 06:02:04 +00:00
Folke Behrens
03666a1f37 Merge pull request #10320 from neondatabase/rc/release-proxy/2025-01-09
Proxy release 2025-01-09
2025-01-09 10:19:07 +01:00
github-actions[bot]
9c92242ca0 Proxy release 2025-01-09 2025-01-09 06:02:06 +00:00
Conrad Ludgate
a354071dd0 Merge pull request #10180 from neondatabase/rc/release-proxy/2024-12-17
Proxy release 2024-12-17
2024-12-18 06:31:05 +00:00
github-actions[bot]
758680d4f8 Proxy release 2024-12-17 2024-12-17 22:06:42 +00:00
Conrad Ludgate
1738fd0a96 Merge pull request #10107 from neondatabase/rc/release-proxy/2024-12-12
Proxy release 2024-12-12
2024-12-12 10:21:30 +00:00
Conrad Ludgate
87b7edfc72 Merge branch 'release-proxy' into rc/release-proxy/2024-12-12 2024-12-12 09:58:31 +00:00
github-actions[bot]
def05700d5 Proxy release 2024-12-12 2024-12-12 06:02:08 +00:00
Ivan Efremov
b547681e08 Merge pull request #10024 from neondatabase/rc/release-proxy/2024-12-05
Proxy release 2024-12-05
2024-12-05 15:35:35 +02:00
Ivan Efremov
0fd211537b proxy: Present new auth backend cplane_proxy_v1 (#10012)
Implement a new auth backend based on the current Neon backend to switch
to the new Proxy V1 cplane API.

Implements [#21048](https://github.com/neondatabase/cloud/issues/21048)
2024-12-05 13:00:40 +02:00
Yuchen Liang
a83bd4e81c pageserver: fix buffered-writer on macos build (#10019)
## Problem

In https://github.com/neondatabase/neon/pull/9693, we forgot to check
macos build. The [CI
run](https://github.com/neondatabase/neon/actions/runs/12164541897/job/33926455468)
on main showed that macos build failed with unused variables and dead
code.

## Summary of changes

- add `allow(dead_code)` and `allow(unused_variables)` to the relevant
code that is not used on macos.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-05 13:00:40 +02:00
Conrad Ludgate
ecdad5e6d5 chore: update rust-postgres (#10002)
Like #9931 but without rebasing upstream just yet, to try and minimise
the differences.

Removes all proxy-specific commits from the rust-postgres fork, now that
proxy no longer depends on them. Merging upstream changes to come later.
2024-12-05 13:00:40 +02:00
Conrad Ludgate
d028929945 chore: update clap (#10009)
This updates clap to use a new version of anstream
2024-12-05 13:00:40 +02:00
Yuchen Liang
7b0e3db868 pageserver: make BufferedWriter do double-buffering (#9693)
Closes #9387.

## Problem

`BufferedWriter` cannot proceed while the owned buffer is flushing to
disk. We want to implement double buffering so that the flush can happen
in the background. See #9387.

## Summary of changes

- Maintain two owned buffers in `BufferedWriter`.
- The writer is in charge of copying the data into owned, aligned
buffer, once full, submit it to the flush task.
- The flush background task is in charge of flushing the owned buffer to
disk, and returned the buffer to the writer for reuse.
- The writer and the flush background task communicate through a
bi-directional channel.

For in-memory layer, we also need to be able to read from the buffered
writer in `get_values_reconstruct_data`. To handle this case, we did the
following
- Use replace `VirtualFile::write_all` with `VirtualFile::write_all_at`,
and use `Arc` to share it between writer and background task.
- leverage `IoBufferMut::freeze` to get a cheaply clonable `IoBuffer`,
one clone will be submitted to the channel, the other clone will be
saved within the writer to serve reads. When we want to reuse the
buffer, we can invoke `IoBuffer::into_mut`, which gives us back the
mutable aligned buffer.
- InMemoryLayer reads is now aware of the maybe_flushed part of the
buffer.

**Caveat**

- We removed the owned version of write, because this interface does not
work well with buffer alignment. The result is that without direct IO
enabled,
[`download_object`](a439d57050/pageserver/src/tenant/remote_timeline_client/download.rs (L243))
does one more memcpy than before this PR due to the switch to use
`_borrowed` version of the write.
- "Bypass aligned part of write" could be implemented later to avoid
large amount of memcpy.

**Testing**
- use an oneshot channel based control mechanism to make flush behavior
deterministic in test.
- test reading from `EphemeralFile` when the last submitted buffer is
not flushed, in-progress, and done flushing to disk.


## Performance


We see performance improvement for small values, and regression on big
values, likely due to being CPU bound + disk write latency.


[Results](https://www.notion.so/neondatabase/Benchmarking-New-BufferedWriter-11-20-2024-143f189e0047805ba99acda89f984d51?pvs=4)


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-12-05 13:00:40 +02:00
John Spray
088eb72dd7 tests: make storcon scale test AZ-aware (#9952)
## Problem

We have a scale test for the storage controller which also acts as a
good stress test for scheduling stability. However, it created nodes
with no AZs set.

## Summary of changes

- Bump node count to 6 and set AZs on them.

This is a precursor to other AZ-related PRs, to make sure any new code
that's landed is getting scale tested in an AZ-aware environment.
2024-12-05 13:00:40 +02:00
a-masterov
d550e3f626 Create a branch for compute release (#9637)
## Problem
We practice a manual release flow for the compute module. This will
allow automation of the compute release process.

## Summary of changes
The workflow was modified to make a compute release automatically on the
branch release-compute.
## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2024-12-05 13:00:40 +02:00
Erik Grinaker
8c6b41daf5 Display reqwest error source (#10004)
## Problem

Reqwest errors don't include details about the inner source error. This
means that we get opaque errors like:

```
receive body: error sending request for url (http://localhost:9898/v1/location_config)
```

Instead of the more helpful:

```
receive body: error sending request for url (http://localhost:9898/v1/location_config): operation timed out
```

Touches #9801.

## Summary of changes

Include the source error for `reqwest::Error` wherever it's displayed.
2024-12-05 13:00:40 +02:00
Alexey Kondratov
bbb050459b feat(compute): Set default application_name for pgbouncer connections (#9973)
## Problem

When client specifies `application_name`, pgbouncer propagates it to the
Postgres. Yet, if client doesn't do it, we have hard time figuring out
who opens a lot of Postgres connections (including the `cloud_admin`
ones).

See this investigation as an example:
https://neondb.slack.com/archives/C0836R0RZ0D

## Summary of changes

I haven't found this documented, but it looks like pgbouncer accepts
standard Postgres connstring parameters in the connstring in the
`[databases]` section, so put the default `application_name=pgbouncer`
there. That way, we will always see who opens Postgres connections. I
did tests, and if client specifies a `application_name`, pgbouncer
overrides this default, so it only works if it's not specified or set to
blank `&application_name=` in the connection string.

This is the last place we could potentially open some Postgres
connections without `application_name`. Everything else should be either
of two:
1. Direct client connections without `application_name`, but these
should be strictly non-`cloud_admin` ones
2. Some ad-hoc internal connections, so if we see spikes of unidentified
`cloud_admin` connections, we will need to investigate it again.

Fixes neondatabase/cloud#20948
2024-12-05 13:00:40 +02:00
Conrad Ludgate
cab498c787 feat(proxy): add option to forward startup params (#9979)
(stacked on #9990 and #9995)

Partially fixes #1287 with a custom option field to enable the fixed
behaviour. This allows us to gradually roll out the fix without silently
changing the observed behaviour for our customers.

related to https://github.com/neondatabase/cloud/issues/15284
2024-12-05 13:00:40 +02:00
Folke Behrens
6359342ffb Assign /libs/proxy/ to proxy team (#10003) 2024-12-05 13:00:40 +02:00
Erik Grinaker
13285c2a5e pageserver: return proper status code for heatmap_upload errors (#9991)
## Problem

During deploys, we see a lot of 500 errors due to heapmap uploads for
inactive tenants. These should be 503s instead.

Resolves #9574.

## Summary of changes

Make the secondary tenant scheduler use `ApiError` rather than
`anyhow::Error`, to propagate the tenant error and convert it to an
appropriate status code.
2024-12-05 13:00:40 +02:00
Peter Bendel
33790d14a3 fix parsing human time output like "50m37s" (#10001)
## Problem

In ingest_benchmark.yml workflow we use pgcopydb tool to migrate
project.
pgcopydb logs human time.

Our parsing of the human time doesn't work for times like "50m37s".

[Example
workflow](https://github.com/neondatabase/neon/actions/runs/12145539948/job/33867418065#step:10:479)

contains "57m45s"

but we
[reported](https://github.com/neondatabase/neon/actions/runs/12145539948/job/33867418065#step:10:500)
only the seconds part: 
45.000 s


## Summary of changes

add a regex pattern for Minute/Second combination
2024-12-05 13:00:40 +02:00
Peter Bendel
709b8cd371 optimize parms for ingest bench (#9999)
## Problem

we tried different parallelism settings for ingest bench 

## Summary of changes

the following settings seem optimal after merging
- SK side Wal filtering
- batched getpages

Settings:
- effective_io_concurrency 100
- concurrency limit 200 (different from Prod!)
- jobs 4, maintenance workers 7
- 10 GB chunk size
2024-12-05 13:00:40 +02:00
Vlad Lazar
1c9bbf1a92 storcon: return an error for drain attempts while paused (#9997)
## Problem

We currently allow drain operations to proceed while the node policy is
paused.

## Summary of changes

Return a precondition failed error in such cases. The orchestrator is
updated in https://github.com/neondatabase/infra/pull/2544 to skip drain
and fills if the pageserver is paused.

Closes: https://github.com/neondatabase/neon/issues/9907
2024-12-05 13:00:40 +02:00
Christian Schwarz
16163fb850 page_service: enable batching in Rust & Python Tests + Python benchmarks (#9993)
This is the first step towards batching rollout.

Refs

- rollout plan: https://github.com/neondatabase/cloud/issues/20620
- task https://github.com/neondatabase/neon/issues/9377
- uber-epic: https://github.com/neondatabase/neon/issues/9376
2024-12-05 13:00:40 +02:00
Alexander Bayandin
73ccc2b08c test_page_service_batching: fix non-numeric metrics (#9998)
## Problem

```
2024-12-03T15:42:46.5978335Z + poetry run python /__w/neon/neon/scripts/ingest_perf_test_result.py --ingest /__w/neon/neon/test_runner/perf-report-local
2024-12-03T15:42:49.5325077Z Traceback (most recent call last):
2024-12-03T15:42:49.5325603Z   File "/__w/neon/neon/scripts/ingest_perf_test_result.py", line 165, in <module>
2024-12-03T15:42:49.5326029Z     main()
2024-12-03T15:42:49.5326316Z   File "/__w/neon/neon/scripts/ingest_perf_test_result.py", line 155, in main
2024-12-03T15:42:49.5326739Z     ingested = ingest_perf_test_result(cur, item, recorded_at_timestamp)
2024-12-03T15:42:49.5327488Z                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-12-03T15:42:49.5327914Z   File "/__w/neon/neon/scripts/ingest_perf_test_result.py", line 99, in ingest_perf_test_result
2024-12-03T15:42:49.5328321Z     psycopg2.extras.execute_values(
2024-12-03T15:42:49.5328940Z   File "/github/home/.cache/pypoetry/virtualenvs/non-package-mode-_pxWMzVK-py3.11/lib/python3.11/site-packages/psycopg2/extras.py", line 1299, in execute_values
2024-12-03T15:42:49.5335618Z     cur.execute(b''.join(parts))
2024-12-03T15:42:49.5335967Z psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type numeric: "concurrent-futures"
2024-12-03T15:42:49.5336287Z LINE 57:             'concurrent-futures',
2024-12-03T15:42:49.5336462Z                      ^
```

## Summary of changes
- `test_page_service_batching`: save non-numeric params as `labels`
- Add a runtime check that `metric_value` is NUMERIC
2024-12-05 13:00:40 +02:00
Christian Schwarz
c719be6474 tests & benchmarks: unify the way we customize the default tenant config (#9992)
Before this PR, some override callbacks used `.default()`, others
used `.setdefault()`.

As of this PR, all callbacks use `.setdefault()` which I think is least
prone to failure.

Aligning on a single way will set the right example for future tests
that need such customization.

The `test_pageserver_getpage_throttle.py` technically is a change in
behavior: before, it replaced the `tenant_config` field, now it just
configures the throttle. This is what I believe is intended anyway.
2024-12-05 13:00:40 +02:00
Arpad Müller
718645e56c Support tenant manifests in the scrubber (#9942)
Support tenant manifests in the storage scrubber:

* list the manifests, order them by generation
* delete all manifests except for the two most recent generations
* for the latest manifest: try parsing it.

I've tested this patch by running the against a staging bucket and it
successfully deleted stuff (and avoided deleting the latest two
generations).

In follow-up work, we might want to also check some invariants of the
manifest, as mentioned in #8088.

Part of #9386
Part of #8088

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-12-05 13:00:40 +02:00
Conrad Ludgate
fbc8c36983 chore(proxy): enforce single host+port (#9995)
proxy doesn't ever provide multiple hosts/ports, so this code adds a lot
of complexity of error handling for no good reason.

(stacked on #9990)
2024-12-05 13:00:40 +02:00
Alexey Immoreev
5519e42612 Improvement: add console redirect timeout warning (#9985)
## Problem

There is no information on session being cancelled in 2 minutes at the
moment

## Summary of changes

The timeout being logged for the user
2024-12-05 13:00:40 +02:00
Erik Grinaker
4157eaf4c5 pageserver: respond to multiple shutdown signals (#9982)
## Problem

The Pageserver signal handler would only respond to a single signal and
initiate shutdown. Subsequent signals were ignored. This meant that a
`SIGQUIT` sent after a `SIGTERM` had no effect (e.g. in the case of a
slow or stalled shutdown). The `test_runner` uses this to force shutdown
if graceful shutdown is slow.

Touches #9740.

## Summary of changes

Keep responding to signals after the initial shutdown signal has been
received.

Arguably, the `test_runner` should also use `SIGKILL` rather than
`SIGQUIT` in this case, but it seems reasonable to respond to `SIGQUIT`
regardless.
2024-12-05 13:00:40 +02:00
Conrad Ludgate
60241127e2 chore(proxy): remove postgres config parser and md5 support (#9990)
Keeping the `mock` postgres cplane adaptor using "stock" tokio-postgres
allows us to remove a lot of dead weight from our actual postgres
connection logic.
2024-12-05 13:00:40 +02:00
John Spray
f7d5322e8b pageserver: more detailed logs when calling re-attach (#9996)
## Problem

We saw a peculiar case where a pageserver apparently got a 0-tenant
response to `/re-attach` but we couldn't see the request landing on a
storage controller. It was hard to confirm retrospectively that the
pageserver was configured properly at the moment it sent the request.

## Summary of changes

- Log the URL to which we are sending the request
- Log the NodeId and metadata that we sent
2024-12-05 13:00:40 +02:00
John Spray
41bb9c5280 pageserver: only store SLRUs & aux files on shard zero (#9786)
## Problem

Since https://github.com/neondatabase/neon/pull/9423 the non-zero shards
no longer need SLRU content in order to do GC. This data is now
redundant on shards >0.

One release cycle after merging that PR, we may merge this one, which
also stops writing those pages to shards > 0, reaping the efficiency
benefit.

Closes: https://github.com/neondatabase/neon/issues/7512
Closes: https://github.com/neondatabase/neon/issues/9641

## Summary of changes

- Avoid storing SLRUs on non-zero shards
- Bonus: avoid storing aux files on non-zero shards
2024-12-05 13:00:40 +02:00
John Spray
69c0d61c5c storcon: in shard splits, inherit parent's AZ (#9946)
## Problem

Sharded tenants should be run in a single AZ for best performance, so
that computes have AZ-local latency to all the shards.

Part of https://github.com/neondatabase/neon/issues/8264

## Summary of changes

- When we split a tenant, instead of updating each shard's preferred AZ
to wherever it is scheduled, propagate the preferred AZ from the parent.
- Drop the check in `test_shard_preferred_azs` that asserts shards end
up in their preferred AZ: this will not be true again until the
optimize_attachment logic is updated to make this so. The existing check
wasn't testing anything about scheduling, it was just asserting that we
set preferred AZ in a way that matches the way things happen to be
scheduled at time of split.
2024-12-05 13:00:40 +02:00
Christian Schwarz
63cb8ce975 pageserver: only throttle pagestream requests & bring back throttling deduction for smgr latency metrics (#9962)
## Problem

In the batching PR 
- https://github.com/neondatabase/neon/pull/9870

I stopped deducting the time-spent-in-throttle fro latency metrics,
i.e.,
- smgr latency metrics (`SmgrOpTimer`)
- basebackup latency (+scan latency, which I think is part of
basebackup).

The reason for stopping the deduction was that with the introduction of
batching, the trick with tracking time-spent-in-throttle inside
RequestContext and swap-replacing it from the `impl Drop for
SmgrOpTimer` no longer worked with >1 requests in a batch.

However, deducting time-spent-in-throttle is desirable because our
internal latency SLO definition does not account for throttling.

## Summary of changes

- Redefine throttling to be a page_service pagestream request throttle
instead of a throttle for repository `Key` reads through `Timeline::get`
/ `Timeline::get_vectored`.
- This means reads done by `basebackup` are no longer subject to any
throttle.
- The throttle applies after batching, before handling of the request.
- Drive-by fix: make throttle sensitive to cancellation.
- Rename metric label `kind` from `timeline_get` to `pagestream` to
reflect the new scope of throttling.

To avoid config format breakage, we leave the config field named
`timeline_get_throttle` and ignore the `task_kinds` field.
This will be cleaned up in a future PR.

## Trade-Offs

Ideally, we would apply the throttle before reading a request off the
connection, so that we queue the minimal amount of work inside the
process.
However, that's not possible because we need to do shard routing.

The redefinition of the throttle to limit pagestream request rate
instead of repository `Key` rate comes with several downsides:
- We're no longer able to use the throttle mechanism for other other
tasks, e.g. image layer creation.
  However, in practice, we never used that capability anyways.
- We no longer throttle basebackup.
2024-12-05 13:00:40 +02:00
Erik Grinaker
907e4aa3c4 test_runner: use immediate shutdown in test_sharded_ingest (#9984)
## Problem

`test_sharded_ingest` ingests a lot of data, which can cause shutdown to
be slow e.g. due to local "S3 uploads" or compactions. This can cause
test flakes during teardown.

Resolves #9740.

## Summary of changes

Perform an immediate shutdown of the cluster.
2024-12-05 13:00:40 +02:00
Erik Grinaker
0a2a84b766 safekeeper,pageserver: add heap profiling (#9778)
## Problem

We don't have good observability for memory usage. This would be useful
e.g. to debug OOM incidents or optimize performance or resource usage.

We would also like to use continuous profiling with e.g. [Grafana Cloud
Profiles](https://grafana.com/products/cloud/profiles-for-continuous-profiling/)
(see https://github.com/neondatabase/cloud/issues/14888).

This PR is intended as a proof of concept, to try it out in staging and
drive further discussions about profiling more broadly.

Touches https://github.com/neondatabase/neon/issues/9534.
Touches https://github.com/neondatabase/cloud/issues/14888.
Depends on #9779.
Depends on #9780.

## Summary of changes

Adds a HTTP route `/profile/heap` that takes a heap profile and returns
it. Query parameters:

* `format`: output format (`jemalloc` or `pprof`; default `pprof`).

Unlike CPU profiles (see #9764), heap profiles are not symbolized and
require the original binary to translate addresses to function names. To
make this work with Grafana, we'll probably have to symbolize the
process server-side -- this is left as future work, as is other output
formats like SVG.

Heap profiles don't work on macOS due to limitations in jemalloc.
2024-12-05 13:00:40 +02:00
a-masterov
85b12ddd52 Add support for the extensions test for Postgres v17 (#9748)
## Problem
The extensions for Postgres v17 are ready but we do not test the
extensions shipped with v17
## Summary of changes
Build the test image based on Postgres v17. Run the tests for v17.

---------

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2024-12-05 13:00:40 +02:00
Christian Schwarz
dd76f1eeee page_service: batching observability & include throttled time in smgr metrics (#9870)
This PR 

- fixes smgr metrics https://github.com/neondatabase/neon/issues/9925 
- adds an additional startup log line logging the current batching
config
- adds a histogram of batch sizes global and per-tenant
- adds a metric exposing the current batching config

The issue described #9925 is that before this PR, request latency was
only observed *after* batching.
This means that smgr latency metrics (most importantly getpage latency)
don't account for
- `wait_lsn` time 
- time spent waiting for batch to fill up / the executor stage to pick
up the batch.

The fix is to use a per-request batching timer, like we did before the
initial batching PR.
We funnel those timers through the entire request lifecycle.

I noticed that even before the initial batching changes, we weren't
accounting for the time spent writing & flushing the response to the
wire.
This PR drive-by fixes that deficiency by dropping the timers at the
very end of processing the batch, i.e., after the `pgb.flush()` call.

I was **unable to maintain the behavior that we deduct
time-spent-in-throttle from various latency metrics.
The reason is that we're using a *single* counter in `RequestContext` to
track micros spent in throttle.
But there are *N* metrics timers in the batch, one per request.
As a consequence, the practice of consuming the counter in the drop
handler of each timer no longer works because all but the first timer
will encounter error `close() called on closed state`.
A failed attempt to maintain the current behavior can be found in
https://github.com/neondatabase/neon/pull/9951.

So, this PR remvoes the deduction behavior from all metrics.
I started a discussion on Slack about it the implications this has for
our internal SLO calculation:
https://neondb.slack.com/archives/C033RQ5SPDH/p1732910861704029

# Refs

- fixes https://github.com/neondatabase/neon/issues/9925
- sub-issue https://github.com/neondatabase/neon/issues/9377
- epic: https://github.com/neondatabase/neon/issues/9376
2024-12-05 13:00:40 +02:00
Christian Schwarz
8963ac85f9 storcon_cli tenant-describe: include tenant-wide information in output (#9899)
Before this PR, the storcon_cli didn't have a way to show the
tenant-wide information of the TenantDescribeResponse.

Sadly, the `Serialize` impl for the tenant config doesn't skip on
`None`, so, the output becomes a bit bloated.
Maybe we can use `skip_serializing_if(Option::is_none)` in the future.
=> https://github.com/neondatabase/neon/issues/9983
2024-12-05 13:00:40 +02:00
John Spray
4a488b3e24 storcon: use proper schedule context during node delete (#9958)
## Problem

I was touching `test_storage_controller_node_deletion` because for AZ
scheduling work I was adding a change to the storage controller (kick
secondaries during optimisation) that made a FIXME in this test defunct.
While looking at it I also realized that we can easily fix the way node
deletion currently doesn't use a proper ScheduleContext, using the
iterator type recently added for that purpose.

## Summary of changes

- A testing-only behavior in storage controller where if a secondary
location isn't yet ready during optimisation, it will be actively
polled.
- Remove workaround in `test_storage_controller_node_deletion` that
previously was needed because optimisation would get stuck on cold
secondaries.
- Update node deletion code to use a `TenantShardContextIterator` and
thereby a proper ScheduleContext
2024-12-05 13:00:40 +02:00
Alexey Kondratov
c4987b0b13 fix(testing): Use 1 MB shared_buffers even with LFC (#9969)
## Problem

After enabling LFC in tests and lowering `shared_buffers` we started
having more problems with `test_pg_regress`.

## Summary of changes

Set `shared_buffers` to 1MB to both exercise getPage requests/LFC, and
still have enough room for Postgres to operate. Everything smaller might
be not enough for Postgres under load, and can cause errors like 'no
unpinned buffers available'.

See Konstantin's comment [1] as well.

Fixes #9956

[1]:
https://github.com/neondatabase/neon/issues/9956#issuecomment-2511608097
2024-12-05 13:00:40 +02:00
Tristan Partin
84b4821118 Stop changing the value of neon.extension_server_port at runtime (#9972)
On reconfigure, we no longer passed a port for the extension server
which caused us to not write out the neon.extension_server_port line.
Thus, Postgres thought we were setting the port to the default value of
0. PGC_POSTMASTER GUCs cannot be set at runtime, which causes the
following log messages:

> LOG: parameter "neon.extension_server_port" cannot be changed without
restarting the server
> LOG: configuration file
"/var/db/postgres/compute/pgdata/postgresql.conf" contains errors;
unaffected changes were applied

Fixes: https://github.com/neondatabase/neon/issues/9945

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-12-05 13:00:40 +02:00
Conrad Ludgate
32ba9811f9 feat(proxy): emit JWT auth method and JWT issuer in parquet logs (#9971)
Fix the HTTP AuthMethod to accomodate the JWT authorization method.
Introduces the JWT issuer as an additional field in the parquet logs
2024-12-05 13:00:40 +02:00
Folke Behrens
a0cd64c4d3 Bump OTel, tracing, reqwest crates (#9970) 2024-12-05 13:00:40 +02:00
Arseny Sher
84687b743d Update consensus protocol spec (#9607)
The spec was written for the buggy protocol which we had before the one
more similar to Raft was implemented. Update the spec with what we
currently have.

ref https://github.com/neondatabase/neon/issues/8699
2024-12-05 13:00:40 +02:00
Folke Behrens
b6f93dcec9 proxy: Create Elasticache credentials provider lazily (#9967)
## Problem

The credentials providers tries to connect to AWS STS even when we use
plain Redis connections.

## Summary of changes

* Construct the CredentialsProvider only when needed ("irsa").
2024-12-05 13:00:40 +02:00
Alexander Bayandin
4f6c594973 CI(replication-tests): fix notifications about replication-tests failures (#9950)
## Problem

`if: ${{ github.event.schedule }}` gets skipped if a previous step has
failed, but we want to run the step for both `success` and `failure`

## Summary of changes
- Add `!cancelled()` to notification step if-condition, to skip only
cancelled jobs
2024-12-05 13:00:40 +02:00
Conrad Ludgate
a750c14735 fix(proxy): forward notifications from authentication (#9948)
Fixes https://github.com/neondatabase/cloud/issues/20973. 

This refactors `connect_raw` in order to return direct access to the
delayed notices.

I cannot find a way to test this with psycopg2 unfortunately, although
testing it with psql does return the expected results.
2024-12-05 13:00:40 +02:00
John Spray
9ce0dd4e55 storcon: add metric for AZ scheduling violations (#9949)
## Problem

We can't easily tell how far the state of shards is from their AZ
preferences. This can be a cause of performance issues, so it's
important for diagnosability that we can tell easily if there are
significant numbers of shards that aren't running in their preferred AZ.

Related: https://github.com/neondatabase/cloud/issues/15413

## Summary of changes

- In reconcile_all, count shards that are scheduled into the wrong AZ
(if they have a preference), and publish it as a prometheus gauge.
- Also calculate a statistic for how many shards wanted to reconcile but
couldn't.

This is clearly a lazy calculation: reconcile all only runs
periodically. But that's okay: shards in the wrong AZ is something that
only matters if it stays that way for some period of time.
2024-12-05 13:00:40 +02:00
Erik Grinaker
0e1a336607 test_runner: improve wait_until (#9936)
Improves `wait_until` by:

* Use `timeout` instead of `iterations`. This allows changing the
timeout/interval parameters independently.
* Make `timeout` and `interval` optional (default 20s and 0.5s). Most
callers don't care.
* Only output status every 1s by default, and add optional
`status_interval` parameter.
* Remove `show_intermediate_error`, this was always emitted anyway.

Most callers have been updated to use the defaults, except where they
had good reason otherwise.
2024-12-05 13:00:40 +02:00
Anastasia Lubennikova
7fc2912d06 Update pgvector to 0.8.0 (#9733) 2024-12-05 13:00:40 +02:00
John Spray
fdf231c237 storcon: don't take any Service locks in /status and /ready (#9944)
## Problem

We saw unexpected container terminations when running in k8s with with
small CPU resource requests.

The /status and /ready handlers called `maybe_forward`, which always
takes the lock on Service::inner.

If there is a lot of writer lock contention, and the container is
starved of CPU, this increases the likelihood that we will get killed by
the kubelet.

It isn't certain that this was a cause of issues, but it is a potential
source that we can eliminate.

## Summary of changes

- Revise logic to return immediately if the URL is in the non-forwarded
list, rather than calling maybe_forward
2024-12-05 13:00:40 +02:00
Konstantin Knizhnik
1e08b5dccc Fix issues with prefetch ring buffer resize (#9847)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1732110190129479


We observe the following error in the logs 
```
[XX000] ERROR: [NEON_SMGR] [shard 3] Incorrect prefetch read: status=1 response=0x7fafef335138 my=128 receive=128
```
most likely caused by changing `neon.readahead_buffer_size`

## Summary of changes

1. Copy shard state
2. Do not use prefetch_set_unused in readahead_buffer_resize
3. Change prefetch buffer overflow criteria

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-05 13:00:40 +02:00
Alexander Bayandin
030810ed3e Compute image: prepare Postgres v14-v16 for Debian 12 (#9954)
## Problem

Current compute images for Postgres 14-16 don't build on Debian 12
because of issues with extensions.
This PR fixes that, but for the current setup, it is mostly a no-op
change.

## Summary of changes
- Use `/bin/bash -euo pipefail` as SHELL to fail earlier
- Fix `plv8` build: backport a trivial patch for v8
- Fix `postgis` build: depend `sfgal` version on Debian version instead
of Postgres version


Tested in: https://github.com/neondatabase/neon/pull/9849
2024-12-05 13:00:40 +02:00
Konstantin Knizhnik
62b74bdc2c Add GUC controlling whether to pause recovery if some critical GUCs at replica have smaller value than on primary (#9057)
## Problem

See https://github.com/neondatabase/neon/issues/9023

## Summary of changes

Ass GUC `recovery_pause_on_misconfig` allowing not to pause in case of
replica and primary configuration mismatch

See https://github.com/neondatabase/postgres/pull/501
See https://github.com/neondatabase/postgres/pull/502
See https://github.com/neondatabase/postgres/pull/503
See https://github.com/neondatabase/postgres/pull/504


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-12-05 13:00:40 +02:00
Folke Behrens
8b7e9ed820 Merge the consumption metric pushes (#9939)
#8564

## Problem

The main and backup consumption metric pushes are completely
independent,
resulting in different event time windows and different idempotency
keys.

## Summary of changes

* Merge the push tasks, but keep chunks the same size.
2024-12-05 13:00:40 +02:00
Christian Schwarz
5dad89acd4 page_service: rewrite batching to work without a timeout (#9851)
# Problem

The timeout-based batching adds latency to unbatchable workloads.

We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.

# Solution

The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.

If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.

This can be reasonably described as **pipelining of the protocol
handler**.

# Implementation

We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:

2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.

The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.

See the long comment in the `handle_pagerequests_pipelined` for details.

# Changes

- Refactor `handle_pagerequests`
    - separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
        - execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
    - serial: what we had before any of the batching changes
      - read one protocol message
      - execute protocol messages
    - pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
  - remove batching timeout field
  - add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
  - ability to configure execution mode
- Tests
  - remove `batch_timeout` parametrization
  - rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
  - rename `test_timer_precision` to `test_latency`
  - rename the test case file to `test_page_service_batching.py`
  - better descriptions of what the tests actually do

## On the holding The `TimelineHandle` in the pending batch

While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.

This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.

- => fixes https://github.com/neondatabase/neon/issues/9850

# Performance

Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.

Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead

Overall, `concurrent-futures` seems like a slightly more attractive
choice.

# Rollout

This change is disabled-by-default.

Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620

# Refs

- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
2024-12-05 13:00:40 +02:00
Matthias van de Meent
547b2d2827 Fix timeout value used in XLogWaitForReplayOf (#9937)
The previous value assumed usec precision, while the timeout used is in
milliseconds, causing replica backends to wait for (potentially) many
hours for WAL replay without the expected progress reports in logs.

This fixes the issue.

Reported-By: Alexander Lakhin <exclusion@gmail.com>

## Problem


https://github.com/neondatabase/postgres/pull/279#issuecomment-2507671817

The timeout value was configured with the assumption the indicated value
would be microseconds, where it's actually milliseconds. That causes the
backend to wait for much longer (2h46m40s) before it emits the "I'm
waiting for recovery" message. While we do have wait events configured
on this, it's not great to have stuck backends without clear logs, so
this fixes the timeout value in all our PostgreSQL branches.

## PG PRs

* PG14: https://github.com/neondatabase/postgres/pull/542
* PG15: https://github.com/neondatabase/postgres/pull/543
* PG16: https://github.com/neondatabase/postgres/pull/544
* PG17: https://github.com/neondatabase/postgres/pull/545
2024-12-05 13:00:40 +02:00
Gleb Novikov
93f29a0065 Fixed fast_import pgbin in calling get_pg_version (#9933)
Was working on https://github.com/neondatabase/cloud/pull/20795 and
discovered that fast_import is not working normally.
2024-12-05 13:00:40 +02:00
John Spray
4f36494615 pageserver: download small objects using a smaller timeout (#9938)
## Problem

It appears that the Azure storage API tends to hang TCP connections more
than S3 does.

Currently we use a 2 minute timeout for all downloads. This is large
because sometimes the objects we download are large. However, waiting 2
minutes when doing something like downloading a manifest on tenant
attach is problematic, because when someone is doing a "create tenant,
create timeline" workflow, that 2 minutes is long enough for them
reasonably to give up creating that timeline.

Rather than propagate oversized timeouts further up the stack, we should
use a different timeout for objects that we expect to be small.

Closes: https://github.com/neondatabase/neon/issues/9836

## Summary of changes

- Add a `small_timeout` configuration attribute to remote storage,
defaulting to 30 seconds (still a very generous period to do something
like download an index)
- Add a DownloadKind parameter to DownloadOpts, so that callers can
indicate whether they expect the object to be small or large.
- In the azure client, use small timeout for HEAD requests, and for GET
requests if DownloadKind::Small is used.
- Use DownloadKind::Small for manifests, indices, and heatmap downloads.

This PR intentionally does not make the equivalent change to the S3
client, to reduce blast radius in case this has unexpected consequences
(we could accomplish the same thing by editing lots of configs, but just
skipping the code is simpler for right now)
2024-12-05 13:00:40 +02:00
Alexey Kondratov
0a550f3e7d feat(compute_ctl): Always set application_name (#9934)
## Problem

It was not always possible to judge what exactly some `cloud_admin`
connections were doing because we didn't consistently set
`application_name` everywhere.

## Summary of changes

Unify the way we connect to Postgres:
1. Switch to building configs everywhere
2. Always set `application_name` and make naming consistent

Follow-up for #9919
Part of neondatabase/cloud#20948
2024-12-05 13:00:40 +02:00
Erik Grinaker
4bb9554e4a safekeeper: use jemalloc (#9780)
## Problem

To add Safekeeper heap profiling in #9778, we need to switch to an
allocator that supports it. Pageserver and proxy already use jemalloc.

Touches #9534.

## Summary of changes

Use jemalloc in Safekeeper.
2024-12-05 13:00:40 +02:00
John Spray
008616cfe6 storage controller: use proper ScheduleContext when evacuating a node (#9908)
## Problem

When picking locations for a shard, we should use a ScheduleContext that
includes all the other shards in the tenant, so that we apply proper
anti-affinity between shards. If we don't do this, then it can lead to
unstable scheduling, where we place a shard somewhere that the optimizer
will then immediately move it away from.

We didn't always do this, because it was a bit awkward to accumulate the
context for a tenant rather than just walking tenants.

This was a TODO in `handle_node_availability_transition`:
```
                        // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
                        // for tenants without secondary locations: if they have a secondary location, then this
                        // schedule() call is just promoting an existing secondary)
```

This is a precursor to https://github.com/neondatabase/neon/issues/8264,
where the current imperfect scheduling during node evacuation hampers
testing.

## Summary of changes

- Add an iterator type that yields each shard along with a
schedulecontext that includes all the other shards from the same tenant
- Use the iterator to replace hand-crafted logic in optimize_all_plan
(functionally identical)
- Use the iterator in `handle_node_availability_transition` to apply
proper anti-affinity during node evacuation.
2024-12-05 13:00:40 +02:00
Conrad Ludgate
e61ec94fbc chore(proxy): vendor a subset of rust-postgres (#9930)
Our rust-postgres fork is getting messy. Mostly because proxy wants more
control over the raw protocol than tokio-postgres provides. As such,
it's diverging more and more. Storage and compute also make use of
rust-postgres, but in more normal usage, thus they don't need our crazy
changes.

Idea: 
* proxy maintains their subset
* other teams use a minimal patch set against upstream rust-postgres

Reviewing this code will be difficult. To implement it, I
1. Copied tokio-postgres, postgres-protocol and postgres-types from
00940fcdb5
2. Updated their package names with the `2` suffix to make them compile
in the workspace.
3. Updated proxy to use those packages
4. Copied in the code from tokio-postgres-rustls 0.13 (with some patches
applied https://github.com/jbg/tokio-postgres-rustls/pull/32
https://github.com/jbg/tokio-postgres-rustls/pull/33)
5. Removed as much dead code as I could find in the vendored libraries
6. Updated the tokio-postgres-rustls code to use our existing channel
binding implementation
2024-12-05 13:00:40 +02:00
Erik Grinaker
e5152551ad test_runner/performance: add logical message ingest benchmark (#9749)
Adds a benchmark for logical message WAL ingestion throughput
end-to-end. Logical messages are essentially noops, and thus ignored by
the Pageserver.

Example results from my MacBook, with fsync enabled:

```
postgres_ingest: 14.445 s
safekeeper_ingest: 29.948 s
pageserver_ingest: 30.013 s
pageserver_recover_ingest: 8.633 s
wal_written: 10,340 MB
message_count: 1310720 messages
postgres_throughput: 715 MB/s
safekeeper_throughput: 345 MB/s
pageserver_throughput: 344 MB/s
pageserver_recover_throughput: 1197 MB/s
```

See
https://github.com/neondatabase/neon/issues/9642#issuecomment-2475995205
for running analysis.

Touches #9642.
2024-12-05 13:00:40 +02:00
Alexey Kondratov
b0822a5499 fix(compute_ctl): Allow usage of DB names with whitespaces (#9919)
## Problem

We used `set_path()` to replace the database name in the connection
string. It automatically does url-safe encoding if the path is not
already encoded, but it does it as per the URL standard, which assumes
that tabs can be safely removed from the path without changing the
meaning of the URL. See, e.g.,
https://url.spec.whatwg.org/#concept-basic-url-parser. It also breaks
for DBs with properly %-encoded names, like with `%20`, as they are kept
intact, but actually should be escaped.

Yet, this is not true for Postgres, where it's completely valid to have
trailing tabs in the database name.

I think this is the PR that caused this regression
https://github.com/neondatabase/neon/pull/9717, as it switched from
`postgres::config::Config` back to `set_path()`.

This was fixed a while ago already [1], btw, I just haven't added a test
to catch this regression back then :(

## Summary of changes

This commit changes the code back to use
`postgres/tokio_postgres::Config` everywhere.

While on it, also do some changes around, as I had to touch this code:
1. Bump some logging from `debug` to `info` in the spec apply path. We
do not use `debug` in prod, and it was tricky to understand what was
going on with this bug in prod.
2. Refactor configuration concurrency calculation code so it was
reusable. Yet, still keep `1` in the case of reconfiguration. The
database can be actively used at this moment, so we cannot guarantee
that there will be enough spare connection slots, and the underlying
code won't handle connection errors properly.
3. Simplify the installed extensions code. It was spawning a blocking
task inside async function, which doesn't make much sense. Instead, just
have a main sync function and call it with `spawn_blocking` in the API
code -- the only place we need it to be async.
4. Add regression python test to cover this and related problems in the
future. Also, add more extensive testing of schema dump and DBs and
roles listing API.

[1]:
4d1e48f3b9
[2]:
https://www.postgresql.org/message-id/flat/20151023003445.931.91267%40wrigleys.postgresql.org

Resolves neondatabase/cloud#20869
2024-12-05 13:00:40 +02:00
Alexander Bayandin
1fb6ab59e8 test_runner: rerun all failed tests (#9917)
## Problem

Currently, we rerun only known flaky tests. This approach was chosen to
reduce the number of tests that go unnoticed (by forcing people to take
a look at failed tests and rerun the job manually), but it has some
drawbacks:
- In PRs, people tend to push new changes without checking failed tests
(that's ok)
- In the main, tests are just restarted without checking
(understandable)
- Parametrised tests become flaky one by one, i.e. if `test[1]` is flaky
`, test[2]` is not marked as flaky automatically (which may or may not
be the case).

I suggest rerunning all failed tests to increase the stability of GitHub
jobs and using the Grafana Dashboard with flaky tests for deeper
analysis.

## Summary of changes
- Rerun all failed tests twice at max
2024-12-05 13:00:40 +02:00
Vlad Lazar
e16439400d pageserver: return correct LSN for interpreted proto keep alive responses (#9928)
## Problem

For the interpreted proto the pageserver is not returning the correct
LSN
in replies to keep alive requests. This is because the interpreted
protocol arm
was not updating `last_rec_lsn`.

## Summary of changes

* Return correct LSN in keep-alive responses
* Fix shard field in wal sender traces
2024-12-05 13:00:40 +02:00
Arpad Müller
e401f66698 Update rust to 1.83.0, also update cargo adjacent tools (#9926)
We keep the practice of keeping the compiler up to date, pointing to the
latest release. This is done by many other projects in the Rust
ecosystem as well.

[Release notes](https://releases.rs/docs/1.83.0/).

Also update `cargo-hakari`, `cargo-deny`, `cargo-hack` and
`cargo-nextest` to their latest versions.

Prior update was in #9445.
2024-12-05 13:00:40 +02:00
Erik Grinaker
2fa461b668 Makefile: build pg_visibility (#9922)
Build the `pg_visibility` extension for use with `neon_local`. This is
useful to inspect the visibility map for debugging.

Touches #9914.
2024-12-05 13:00:40 +02:00
Vlad Lazar
03d90bc0b3 remote_storage/abs: count 404 and 304 for get as ok for metrics (#9912)
## Problem

We currently see elevated levels of errors for GetBlob requests. This is
because 404 and 304 are counted as errors for metric reporting.

## Summary of Changes

Bring the implementation in line with the S3 client and treat 404 and
304 responses as ok for metric purposes.

Related: https://github.com/neondatabase/cloud/issues/20666
2024-12-05 13:00:40 +02:00
Ivan Efremov
268bc890ea proxy: spawn cancellation checks in the background (#9918)
## Problem
For cancellation, a connection is open during all the cancel checks.
## Summary of changes
Spawn cancellation checks in the background, and close connection
immediately.
Use task_tracker for cancellation checks.
2024-12-05 13:00:40 +02:00
Folke Behrens
8a6ee79f6f Merge pull request #9921 from neondatabase/rc/release-proxy/2024-11-28
Proxy release 2024-11-28
2024-11-28 11:09:06 +01:00
github-actions[bot]
9052c32b46 Proxy release 2024-11-28 2024-11-28 06:02:15 +00:00
Ivan Efremov
995e729ebe Merge pull request #9832 from neondatabase/rc/release-proxy/2024-11-21
Proxy release 2024-11-21
2024-11-21 09:41:31 +02:00
github-actions[bot]
76077e1ddf Proxy release 2024-11-21 2024-11-21 06:02:11 +00:00
Ivan Efremov
0467d88f06 Merge pull request #9756 from neondatabase/rc/proxy/2024-11-14
Proxy release 2024-11-14
2024-11-14 09:46:52 +02:00
Ivan Efremov
f5eec194e7 Merge pull request #9674 from neondatabase/rc/proxy/2024-11-07
Proxy release 2024-11-07
2024-11-07 12:07:12 +02:00
Conrad Ludgate
7e00be391d Merge pull request #9558 from neondatabase/rc/proxy/2024-10-29
Auth broker release 2024-10-29
2024-10-29 12:10:50 +00:00
Folke Behrens
d56599df2a Merge pull request #9499 from neondatabase/rc/proxy/2024-10-24
Proxy release 2024-10-24
2024-10-24 10:34:56 +02:00
Folke Behrens
9d9aab3680 Merge pull request #9426 from neondatabase/rc/proxy/2024-10-17
Proxy release 2024-10-17
2024-10-17 12:18:51 +02:00
Folke Behrens
a202b1b5cc Merge pull request #9341 from neondatabase/rc/proxy/2024-10-10
Proxy release 2024-10-10
2024-10-10 09:17:11 +02:00
Folke Behrens
90f731f3b1 Merge pull request #9256 from neondatabase/rc/proxy/2024-10-03
Proxy release 2024-10-03
2024-10-03 11:01:41 +02:00
Conrad Ludgate
7736b748d3 Merge pull request #9159 from neondatabase/rc/proxy/2024-09-26
Proxy release 2024-09-26
2024-09-26 09:22:33 +01:00
Conrad Ludgate
9c23333cb3 Merge pull request #9056 from neondatabase/rc/proxy/2024-09-19
Proxy release 2024-09-19
2024-09-19 10:41:17 +01:00
Conrad Ludgate
66a99009ba Merge pull request #8799 from neondatabase/rc/proxy/2024-08-22
Proxy release 2024-08-22
2024-08-22 10:04:56 +01:00
Conrad Ludgate
5d4c57491f Merge pull request #8723 from neondatabase/rc/proxy/2024-08-14
Proxy release 2024-08-14
2024-08-14 13:05:51 +01:00
Conrad Ludgate
73935ea3a2 Merge pull request #8647 from neondatabase/rc/proxy/2024-08-08
Proxy release 2024-08-08
2024-08-08 15:37:09 +01:00
Conrad Ludgate
32e595d4dd Merge branch 'release-proxy' into rc/proxy/2024-08-08 2024-08-08 13:53:33 +01:00
Conrad Ludgate
b0d69acb07 Merge pull request #8505 from neondatabase/rc/proxy/2024-07-25
Proxy release 2024-07-25
2024-07-25 11:07:19 +01:00
Conrad Ludgate
98355a419a Merge pull request #8351 from neondatabase/rc/proxy/2024-07-11
Proxy release 2024-07-11
2024-07-11 10:40:17 +01:00
Conrad Ludgate
cfb03d6cf0 Merge pull request #8178 from neondatabase/rc/proxy/2024-06-27
Proxy release 2024-06-27
2024-06-27 11:35:30 +01:00
Conrad Ludgate
d81ef3f962 Revert "proxy: update tokio-postgres to allow arbitrary config params (#8076)"
This reverts commit 78d9059fc7.
2024-06-27 09:46:58 +01:00
Conrad Ludgate
5d62c67e75 Merge pull request #8117 from neondatabase/rc/proxy/2024-06-20
Proxy release 2024-06-20
2024-06-20 11:42:35 +01:00
Anna Khanova
53d53d5b1e Merge pull request #7980 from neondatabase/rc/proxy/2024-06-06
Proxy release 2024-06-06
2024-06-06 13:14:40 +02:00
Anna Khanova
29fe6ea47a Merge pull request #7909 from neondatabase/rc/proxy/2024-05-30
Proxy release 2024-05-30
2024-05-30 14:59:41 +02:00
Alexander Bayandin
640327ccb3 Merge pull request #7880 from neondatabase/rc/proxy/2024-05-24
Proxy release 2024-05-24
2024-05-24 18:00:18 +01:00
Anna Khanova
7cf0f6b37e Merge pull request #7853 from neondatabase/rc/proxy/2024-05-23
Proxy release 2024-05-23
2024-05-23 12:09:13 +02:00
Anna Khanova
03c2c569be [proxy] Do not fail after parquet upload error (#7858)
## Problem

If the parquet upload was unsuccessful, it will panic.

## Summary of changes

Write error in logs instead.
2024-05-23 11:44:47 +02:00
Conrad Ludgate
eff6d4538a Merge pull request #7654 from neondatabase/rc/proxy/2024-05-08
Proxy release 2024-05-08
2024-05-08 11:56:20 +01:00
Conrad Ludgate
5ef7782e9c Merge pull request #7649 from neondatabase/rc/proxy/2024-05-08
Proxy release 2024-05-08
2024-05-08 06:54:03 +01:00
Conrad Ludgate
73101db8c4 Merge branch 'release-proxy' into rc/proxy/2024-05-08 2024-05-08 06:43:57 +01:00
Anna Khanova
bccdfc6d39 Merge pull request #7580 from neondatabase/rc/proxy/2024-05-02
Proxy release 2024-05-02
2024-05-02 12:00:01 +02:00
Anna Khanova
99595813bb proxy: keep track on the number of events from redis by type. (#7582)
## Problem

It's unclear what is the distribution of messages, proxy is consuming
from redis.

## Summary of changes

Add counter.
2024-05-02 11:56:19 +02:00
Anna Khanova
fe07b54758 Merge pull request #7507 from neondatabase/rc/proxy/2024-04-25
Proxy release 2024-04-25
2024-04-25 13:50:05 +02:00
Anna Khanova
a42d173e7b proxy: Fix cancellations (#7510)
## Problem

Cancellations were published to the channel, that was never read.

## Summary of changes

Fallback to global redis publishing.
2024-04-25 13:42:25 +02:00
Anna Khanova
e07f689238 Update connect to compute and wake compute retry configs (#7509)
## Problem

## Summary of changes

Decrease waiting time
2024-04-25 13:20:21 +02:00
Conrad Ludgate
7831eddc88 Merge pull request #7417 from neondatabase/rc/proxy/2024-04-18
Proxy release 2024-04-18
2024-04-18 12:03:07 +01:00
Conrad Ludgate
943b1bc80c Merge pull request #7366 from neondatabase/proxy-hotfix
Release proxy (2024-04-11 hotfix)
2024-04-12 10:15:14 +01:00
Conrad Ludgate
95a184e9b7 proxy: fix overloaded db connection closure (#7364)
## Problem

possible for the database connections to not close in time.

## Summary of changes

force the closing of connections if the client has hung up
2024-04-11 23:38:47 +01:00
Conrad Ludgate
3fa17e9d17 Merge pull request #7357 from neondatabase/rc/proxy/2024-04-11
Proxy release 2024-04-11
2024-04-11 11:49:45 +01:00
Anna Khanova
55e0fd9789 Merge pull request #7304 from neondatabase/rc/proxy/2024-04-04
Proxy release 2024-04-04
2024-04-04 12:40:11 +02:00
Anna Khanova
2a88889f44 Merge pull request #7254 from neondatabase/rc/proxy/2024-03-27
Proxy release 2024-03-27
2024-03-27 11:44:09 +01:00
Conrad Ludgate
5bad8126dc Merge pull request #7173 from neondatabase/rc/proxy/2024-03-19
Proxy release 2024-03-19
2024-03-19 12:11:42 +00:00
Anna Khanova
27bc242085 Merge pull request #7119 from neondatabase/rc/proxy/2024-03-14
Proxy release 2024-03-14
2024-03-14 14:57:05 +05:00
Anna Khanova
192b49cc6d Merge branch 'release-proxy' into rc/proxy/2024-03-14 2024-03-14 14:16:36 +05:00
Conrad Ludgate
e1b60f3693 Merge pull request #7041 from neondatabase/rc/proxy/2024-03-07
Proxy release 2024-03-07
2024-03-08 08:19:16 +00:00
Anna Khanova
2804f5323b Merge pull request #6997 from neondatabase/rc/proxy/2024-03-04
Proxy release 2024-03-04
2024-03-04 17:36:11 +04:00
Anna Khanova
676adc6b32 Merge branch 'release-proxy' into rc/proxy/2024-03-04 2024-03-04 16:41:46 +04:00
42 changed files with 308 additions and 859 deletions

View File

@@ -3,12 +3,12 @@ name: Periodic pagebench performance test on dedicated EC2 machine in eu-central
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# ┌───────────── hour (0 - 23)
# │ ┌───────────── day of the month (1 - 31)
# │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 */3 * * *' # Runs every 3 hours
# ┌───────────── minute (0 - 59)
# ┌───────────── hour (0 - 23)
# │ ┌───────────── day of the month (1 - 31)
# │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 18 * * *' # Runs at 6 PM UTC every day
workflow_dispatch: # Allows manual triggering of the workflow
inputs:
commit_hash:

View File

@@ -1,9 +1,8 @@
# Autoscaling
/libs/vm_monitor/ @neondatabase/autoscaling
# DevProd & PerfCorr
/.github/ @neondatabase/developer-productivity @neondatabase/performance-correctness
/test_runner/ @neondatabase/performance-correctness
# DevProd
/.github/ @neondatabase/developer-productivity
# Compute
/pgxn/ @neondatabase/compute

13
Cargo.lock generated
View File

@@ -1127,9 +1127,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.2.16"
version = "1.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945"
dependencies = [
"jobserver",
"libc",
@@ -4303,7 +4303,6 @@ dependencies = [
"tracing",
"url",
"utils",
"uuid",
"wal_decoder",
"walkdir",
"workspace_hack",
@@ -5627,16 +5626,16 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.13"
version = "0.17.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee"
checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.11",
"libc",
"spin",
"untrusted",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -1980,10 +1980,12 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neo
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# rsyslog config permissions
# directory for rsyslogd pid file
RUN mkdir /var/run/rsyslogd && \
chown -R postgres:postgres /var/run/rsyslogd && \
chown -R postgres:postgres /etc/rsyslog.d/
RUN chown postgres:postgres /etc/rsyslog.conf && \
touch /etc/compute_rsyslog.conf && \
chown -R postgres:postgres /etc/compute_rsyslog.conf && \
# directory for rsyslogd pid file
mkdir /var/run/rsyslogd && \
chown -R postgres:postgres /var/run/rsyslogd
ENV LANG=en_US.utf8

View File

@@ -1,5 +1 @@
SELECT sum(pg_database_size(datname)) AS total
FROM pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2;
SELECT sum(pg_database_size(datname)) AS total FROM pg_database;

View File

@@ -1,20 +1,10 @@
-- We export stats for 10 non-system databases. Without this limit it is too
-- easy to abuse the system by creating lots of databases.
SELECT pg_database_size(datname) AS db_size,
deadlocks,
tup_inserted AS inserted,
tup_updated AS updated,
tup_deleted AS deleted,
datname
SELECT pg_database_size(datname) AS db_size, deadlocks, tup_inserted AS inserted,
tup_updated AS updated, tup_deleted AS deleted, datname
FROM pg_stat_database
WHERE datname IN (
SELECT datname FROM pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2
AND datname <> 'postgres'
AND NOT datistemplate
ORDER BY oid
LIMIT 10
WHERE datname <> 'postgres' AND NOT datistemplate ORDER BY oid LIMIT 10
);

View File

@@ -39,10 +39,6 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: rsyslogd
user: postgres
sysvInitAction: respawn
shell: '/usr/sbin/rsyslogd -n -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -73,12 +69,6 @@ files:
}
memory {}
}
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
# compute_ctl will rewrite this file with the actual configuration, if needed.
- filename: compute_rsyslog.conf
content: |
*.* /dev/null
$IncludeConfig /etc/rsyslog.d/*.conf
build: |
# Build cgroup-tools
#
@@ -142,12 +132,6 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf
RUN chmod 0666 /var/log/
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -39,10 +39,6 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: rsyslogd
user: postgres
sysvInitAction: respawn
shell: '/usr/sbin/rsyslogd -n -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -73,12 +69,6 @@ files:
}
memory {}
}
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
# compute_ctl will rewrite this file with the actual configuration, if needed.
- filename: compute_rsyslog.conf
content: |
*.* /dev/null
$IncludeConfig /etc/rsyslog.d/*.conf
build: |
# Build cgroup-tools
#
@@ -138,11 +128,6 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf
RUN chmod 0666 /var/log/
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -37,7 +37,7 @@ use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::rsyslog::configure_audit_rsyslog;
use crate::rsyslog::configure_and_start_rsyslog;
use crate::spec::*;
use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
@@ -297,6 +297,79 @@ struct StartVmMonitorResult {
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
}
pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
let roles = spec
.cluster
.roles
.iter()
.map(|r| escape_literal(&r.name))
.collect::<Vec<_>>();
let dbs = spec
.cluster
.databases
.iter()
.map(|db| escape_literal(&db.name))
.collect::<Vec<_>>();
let roles_decl = if roles.is_empty() {
String::from("roles text[] := NULL;")
} else {
format!(
r#"
roles text[] := ARRAY(SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname IN ({}));"#,
roles.join(", ")
)
};
let database_decl = if dbs.is_empty() {
String::from("dbs text[] := NULL;")
} else {
format!(
r#"
dbs text[] := ARRAY(SELECT datname
FROM pg_catalog.pg_database
WHERE datname IN ({}));"#,
dbs.join(", ")
)
};
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
let query = format!(
r#"
DO $$
DECLARE
r text;
{}
{}
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
IF array_length(roles, 1) IS NOT NULL THEN
EXECUTE format('GRANT neon_superuser TO %s',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
FOREACH r IN ARRAY roles LOOP
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
END LOOP;
END IF;
IF array_length(dbs, 1) IS NOT NULL THEN
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
END IF;
END IF;
END
$$;"#,
roles_decl, database_decl,
);
query
}
impl ComputeNode {
pub fn new(
params: ComputeNodeParams,
@@ -616,7 +689,7 @@ impl ComputeNode {
let log_directory_path = Path::new(&self.params.pgdata).join("log");
// TODO: make this more robust
// now rsyslog starts once and there is no monitoring or restart if it fails
configure_audit_rsyslog(
configure_and_start_rsyslog(
log_directory_path.to_str().unwrap(),
"hipaa",
&remote_endpoint,

View File

@@ -186,40 +186,15 @@ impl DatabaseExt for Database {
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {
fn pg_quote(&self) -> String;
fn pg_quote_dollar(&self) -> (String, String);
}
impl Escaping for PgIdent {
/// This is intended to mimic Postgres quote_ident(), but for simplicity it
/// always quotes provided string with `""` and escapes every `"`.
/// **Not idempotent**, i.e. if string is already escaped it will be escaped again.
/// N.B. it's not useful for escaping identifiers that are used inside WHERE
/// clause, use `escape_literal()` instead.
fn pg_quote(&self) -> String {
format!("\"{}\"", self.replace('"', "\"\""))
}
/// This helper is intended to be used for dollar-escaping strings for usage
/// inside PL/pgSQL procedures. In addition to dollar-escaping the string,
/// it also returns a tag that is intended to be used inside the outer
/// PL/pgSQL procedure. If you do not need an outer tag, just discard it.
/// Here we somewhat mimic the logic of Postgres' `pg_get_functiondef()`,
/// <https://github.com/postgres/postgres/blob/8b49392b270b4ac0b9f5c210e2a503546841e832/src/backend/utils/adt/ruleutils.c#L2924>
fn pg_quote_dollar(&self) -> (String, String) {
let mut tag: String = "".to_string();
let mut outer_tag = "x".to_string();
// Find the first suitable tag that is not present in the string.
// Postgres' max role/DB name length is 63 bytes, so even in the
// worst case it won't take long.
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
tag += "x";
outer_tag = tag.clone() + "x";
}
let escaped = format!("${tag}${self}${tag}$");
(escaped, outer_tag)
let result = format!("\"{}\"", self.replace('"', "\"\""));
result
}
}
@@ -251,13 +226,10 @@ pub async fn get_existing_dbs_async(
// invalid state. See:
// https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
let rowstream = client
// We use a subquery instead of a fancy `datdba::regrole::text AS owner`,
// because the latter automatically wraps the result in double quotes,
// if the role name contains special characters.
.query_raw::<str, &String, &[String; 0]>(
"SELECT
datname AS name,
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
datdba::regrole::text AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit = - 2 AS invalid
FROM

View File

@@ -21,34 +21,40 @@ fn get_rsyslog_pid() -> Option<String> {
}
}
// Restart rsyslogd to apply the new configuration.
// This is necessary, because there is no other way to reload the rsyslog configuration.
//
// Rsyslogd shouldn't lose any messages, because of the restart,
// because it tracks the last read position in the log files
// and will continue reading from that position.
// TODO: test it properly
//
fn restart_rsyslog() -> Result<()> {
let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
info!("rsyslogd is running with pid: {}, restart it", old_pid);
// Start rsyslogd with the specified configuration file
// If it is already running, do nothing.
fn start_rsyslog(rsyslog_conf_path: &str) -> Result<()> {
let pid = get_rsyslog_pid();
if let Some(pid) = pid {
info!("rsyslogd is already running with pid: {}", pid);
return Ok(());
}
// kill it to restart
let _ = Command::new("pkill")
.arg("rsyslogd")
let _ = Command::new("/usr/sbin/rsyslogd")
.arg("-f")
.arg(rsyslog_conf_path)
.arg("-i")
.arg("/var/run/rsyslogd/rsyslogd.pid")
.output()
.context("Failed to stop rsyslogd")?;
.context("Failed to start rsyslogd")?;
// Check that rsyslogd is running
if let Some(pid) = get_rsyslog_pid() {
info!("rsyslogd started successfully with pid: {}", pid);
} else {
return Err(anyhow::anyhow!("Failed to start rsyslogd"));
}
Ok(())
}
pub fn configure_audit_rsyslog(
pub fn configure_and_start_rsyslog(
log_directory: &str,
tag: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
include_str!("config_template/compute_rsyslog_template.conf"),
log_directory = log_directory,
tag = tag,
remote_endpoint = remote_endpoint
@@ -56,7 +62,7 @@ pub fn configure_audit_rsyslog(
info!("rsyslog config_content: {}", config_content);
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
let rsyslog_conf_path = "/etc/compute_rsyslog.conf";
let mut file = OpenOptions::new()
.create(true)
.write(true)
@@ -65,13 +71,10 @@ pub fn configure_audit_rsyslog(
file.write_all(config_content.as_bytes())?;
info!(
"rsyslog configuration file {} added successfully. Starting rsyslogd",
rsyslog_conf_path
);
info!("rsyslog configuration added successfully. Starting rsyslogd");
// start the service, using the configuration
restart_rsyslog()?;
start_rsyslog(rsyslog_conf_path)?;
Ok(())
}

View File

@@ -13,17 +13,16 @@ use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use crate::compute::{ComputeNode, ComputeState};
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateNeonSuperuser,
CreatePgauditExtension, CreatePgauditlogtofileExtension, CreateSchemaNeon,
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
CreatePgauditlogtofileExtension, CreateSchemaNeon, CreateSuperUser, DisablePostgresDBPgAudit,
DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension,
HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
@@ -188,7 +187,7 @@ impl ComputeNode {
}
for phase in [
CreateNeonSuperuser,
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -469,7 +468,7 @@ pub enum PerDatabasePhase {
#[derive(Clone, Debug)]
pub enum ApplySpecPhase {
CreateNeonSuperuser,
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -596,10 +595,14 @@ async fn get_operations<'a>(
apply_spec_phase: &'a ApplySpecPhase,
) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
match apply_spec_phase {
ApplySpecPhase::CreateNeonSuperuser => Ok(Box::new(once(Operation {
query: include_str!("sql/create_neon_superuser.sql").to_string(),
comment: None,
}))),
ApplySpecPhase::CreateSuperUser => {
let query = construct_superuser_query(spec);
Ok(Box::new(once(Operation {
query,
comment: None,
})))
}
ApplySpecPhase::DropInvalidDatabases => {
let mut ctx = ctx.write().await;
let databases = &mut ctx.dbs;
@@ -733,15 +736,14 @@ async fn get_operations<'a>(
// We do not check whether the DB exists or not,
// Postgres will take care of it for us
"delete_db" => {
let (db_name, outer_tag) = op.name.pg_quote_dollar();
// In Postgres we can't drop a database if it is a template.
// So we need to unset the template flag first, but it could
// be a retry, so we could've already dropped the database.
// Check that database exists first to make it idempotent.
let unset_template_query: String = format!(
include_str!("sql/unset_template_for_drop_dbs.sql"),
datname = db_name,
outer_tag = outer_tag,
datname_str = escape_literal(&op.name),
datname = &op.name.pg_quote()
);
// Use FORCE to drop database even if there are active connections.
@@ -848,8 +850,6 @@ async fn get_operations<'a>(
comment: None,
},
Operation {
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
query: format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
db.name.pg_quote()
@@ -909,11 +909,9 @@ async fn get_operations<'a>(
PerDatabasePhase::DropLogicalSubscriptions => {
match &db {
DB::UserDB(db) => {
let (db_name, outer_tag) = db.name.pg_quote_dollar();
let drop_subscription_query: String = format!(
include_str!("sql/drop_subscriptions.sql"),
datname_str = db_name,
outer_tag = outer_tag,
datname_str = escape_literal(&db.name),
);
let operations = vec![Operation {
@@ -952,7 +950,6 @@ async fn get_operations<'a>(
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
Some(vec![
// This will reassign all dependent objects to the db owner
@@ -967,9 +964,7 @@ async fn get_operations<'a>(
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
role_name = quoted,
),
comment: None,
},
@@ -994,14 +989,12 @@ async fn get_operations<'a>(
DB::SystemDB => return Ok(Box::new(empty())),
DB::UserDB(db) => db,
};
let (db_owner, outer_tag) = db.owner.pg_quote_dollar();
let operations = vec![
Operation {
query: format!(
include_str!("sql/set_public_schema_owner.sql"),
db_owner = db_owner,
outer_tag = outer_tag,
db_owner = db.owner.pg_quote()
),
comment: None,
},

View File

@@ -1,8 +0,0 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -1,4 +1,4 @@
DO ${outer_tag}$
DO $$
DECLARE
subname TEXT;
BEGIN
@@ -9,4 +9,4 @@ BEGIN
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
END LOOP;
END;
${outer_tag}$;
$$;

View File

@@ -1,6 +1,6 @@
SET SESSION ROLE neon_superuser;
DO ${outer_tag}$
DO $$
DECLARE
schema TEXT;
revoke_query TEXT;
@@ -16,15 +16,13 @@ BEGIN
WHERE schema_name IN ('public')
LOOP
revoke_query := format(
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY neon_superuser;',
schema,
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
{role_name}
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM {role_name} GRANTED BY neon_superuser;',
schema
);
EXECUTE revoke_query;
END LOOP;
END;
${outer_tag}$;
$$;
RESET ROLE;

View File

@@ -1,4 +1,5 @@
DO ${outer_tag}$
DO
$$
DECLARE
schema_owner TEXT;
BEGIN
@@ -15,8 +16,8 @@ DO ${outer_tag}$
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
THEN
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
ALTER SCHEMA public OWNER TO {db_owner};
END IF;
END IF;
END
${outer_tag}$;
$$;

View File

@@ -1,12 +1,12 @@
DO ${outer_tag}$
DO $$
BEGIN
IF EXISTS(
SELECT 1
FROM pg_catalog.pg_database
WHERE datname = {datname}
WHERE datname = {datname_str}
)
THEN
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
ALTER DATABASE {datname} is_template false;
END IF;
END
${outer_tag}$;
$$;

View File

@@ -61,23 +61,6 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
assert_eq!(ident.pg_quote(), "\"\"\"name\"\";\\n select 1;\"");
}
#[test]
fn ident_pg_quote_dollar() {
let test_cases = vec![
("name", ("$$name$$", "x")),
("name$$", ("$x$name$$$x$", "xx")),
("name$$$", ("$x$name$$$$x$", "xx")),
("name$$$$", ("$x$name$$$$$x$", "xx")),
("name$x$", ("$xx$name$x$$xx$", "xxx")),
];
for (input, expected) in test_cases {
let (escaped, tag) = PgIdent::from(input).pg_quote_dollar();
assert_eq!(escaped, expected.0);
assert_eq!(tag, expected.1);
}
}
#[test]
fn generic_options_search() {
let generic_options: GenericOptions = Some(vec![

View File

@@ -1146,15 +1146,6 @@ pub struct TimelineArchivalConfigRequest {
pub state: TimelineArchivalState,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct TimelinePatchIndexPartRequest {
pub rel_size_migration: Option<RelSizeMigration>,
pub gc_compaction_last_completed_lsn: Option<Lsn>,
pub applied_gc_cutoff_lsn: Option<Lsn>,
#[serde(default)]
pub force_index_update: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelinesInfoAndOffloaded {
pub timelines: Vec<TimelineInfo>,

View File

@@ -98,7 +98,6 @@ criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
indoc.workspace = true
uuid.workspace = true
[[bench]]
name = "bench_layer_map"

View File

@@ -456,8 +456,8 @@ impl PageServerConf {
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
load_previous_heatmap: load_previous_heatmap.unwrap_or(false),
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(false),
};
// ------------------------------------------------------------
@@ -491,9 +491,7 @@ impl PageServerConf {
#[cfg(test)]
pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
let test_id = uuid::Uuid::new_v4();
Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}_{test_id}"))
Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
}
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {

View File

@@ -37,8 +37,7 @@ use pageserver_api::models::{
TenantShardSplitResponse, TenantSorting, TenantState, TenantWaitLsnRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateRequestMode,
TimelineCreateRequestModeImportPgdata, TimelineGcRequest, TimelineInfo,
TimelinePatchIndexPartRequest, TimelinesInfoAndOffloaded, TopTenantShardItem,
TopTenantShardsRequest, TopTenantShardsResponse,
TimelinesInfoAndOffloaded, TopTenantShardItem, TopTenantShardsRequest, TopTenantShardsResponse,
};
use pageserver_api::shard::{ShardCount, TenantShardId};
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
@@ -64,7 +63,6 @@ use crate::tenant::mgr::{
GetActiveTenantError, GetTenantError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlot, TenantSlotError, TenantSlotUpsertError, TenantStateError, UpsertLocationError,
};
use crate::tenant::remote_timeline_client::index::GcCompactionState;
use crate::tenant::remote_timeline_client::{
download_index_part, list_remote_tenant_shards, list_remote_timelines,
};
@@ -860,75 +858,6 @@ async fn timeline_archival_config_handler(
json_response(StatusCode::OK, ())
}
/// This API is used to patch the index part of a timeline. You must ensure such patches are safe to apply. Use this API as an emergency
/// measure only.
///
/// Some examples of safe patches:
/// - Increase the gc_cutoff and gc_compaction_cutoff to a larger value in case of a bug that didn't bump the cutoff and cause read errors.
/// - Force set the index part to use reldir v2 (migrating/migrated).
///
/// Some examples of unsafe patches:
/// - Force set the index part from v2 to v1 (legacy). This will cause the code path to ignore anything written to the new keyspace and cause
/// errors.
/// - Decrease the gc_cutoff without validating the data really exists. It will cause read errors in the background.
async fn timeline_patch_index_part_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let request_data: TimelinePatchIndexPartRequest = json_request(&mut request).await?;
check_permission(&request, None)?; // require global permission for this request
let state = get_state(&request);
async {
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
if let Some(rel_size_migration) = request_data.rel_size_migration {
timeline
.update_rel_size_v2_status(rel_size_migration)
.map_err(ApiError::InternalServerError)?;
}
if let Some(gc_compaction_last_completed_lsn) =
request_data.gc_compaction_last_completed_lsn
{
timeline
.update_gc_compaction_state(GcCompactionState {
last_completed_lsn: gc_compaction_last_completed_lsn,
})
.map_err(ApiError::InternalServerError)?;
}
if let Some(applied_gc_cutoff_lsn) = request_data.applied_gc_cutoff_lsn {
{
let guard = timeline.applied_gc_cutoff_lsn.lock_for_write();
guard.store_and_unlock(applied_gc_cutoff_lsn);
}
}
if request_data.force_index_update {
timeline
.remote_client
.force_schedule_index_upload()
.context("force schedule index upload")
.map_err(ApiError::InternalServerError)?;
}
Ok::<_, ApiError>(())
}
.instrument(info_span!("timeline_patch_index_part",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
%timeline_id))
.await?;
json_response(StatusCode::OK, ())
}
async fn timeline_detail_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3700,10 +3629,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
|r| api_handler(r, get_timestamp_of_lsn_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/patch_index_part",
|r| api_handler(r, timeline_patch_index_part_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/lsn_lease",
|r| api_handler(r, lsn_lease_handler),

View File

@@ -143,29 +143,6 @@ pub(crate) static LAYERS_PER_READ_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LAYERS_PER_READ_BATCH_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_per_read_batch_global",
"Layers visited to serve a single read batch (read amplification), regardless of number of reads.",
vec![
1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0
],
)
.expect("failed to define a metric")
});
pub(crate) static LAYERS_PER_READ_AMORTIZED_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_per_read_amortized_global",
"Layers visited to serve a single read (read amplification). Amortized across a batch: \
all visited layers are divided by number of reads.",
vec![
1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0
],
)
.expect("failed to define a metric")
});
pub(crate) static DELTAS_PER_READ_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
// We expect this to be low because of Postgres checkpoints. Let's see if that holds.
register_histogram!(
@@ -4097,8 +4074,6 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
// histograms
[
&LAYERS_PER_READ_GLOBAL,
&LAYERS_PER_READ_BATCH_GLOBAL,
&LAYERS_PER_READ_AMORTIZED_GLOBAL,
&DELTAS_PER_READ_GLOBAL,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,

View File

@@ -954,14 +954,6 @@ impl RemoteTimelineClient {
Ok(())
}
/// Only used in the `patch_index_part` HTTP API to force trigger an index upload.
pub fn force_schedule_index_upload(self: &Arc<Self>) -> Result<(), NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_index_upload(upload_queue);
Ok(())
}
/// Launch an index-file upload operation in the background (internal function)
fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();

View File

@@ -869,7 +869,8 @@ impl<'a> TenantDownloader<'a> {
let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
let layers_in_heatmap = heatmap_timeline
.hot_layers()
.layers
.iter()
.map(|l| (&l.name, l.metadata.generation))
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
@@ -1014,8 +1015,7 @@ impl<'a> TenantDownloader<'a> {
// Accumulate updates to the state
let mut touched = Vec::new();
let timeline_id = timeline.timeline_id;
for layer in timeline.into_hot_layers() {
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return (Err(UpdateError::Cancelled), touched);
@@ -1040,7 +1040,7 @@ impl<'a> TenantDownloader<'a> {
}
match self
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await
{
Ok(Some(layer)) => touched.push(layer),
@@ -1148,7 +1148,7 @@ impl<'a> TenantDownloader<'a> {
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_id = timeline.timeline_id;
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
let (result, touched) = self
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
@@ -1316,11 +1316,11 @@ async fn init_timeline_state(
// As we iterate through layers found on disk, we will look up their metadata from this map.
// Layers not present in metadata will be discarded.
let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
heatmap.hot_layers().map(|l| (&l.name, l)).collect();
heatmap.layers.iter().map(|l| (&l.name, l)).collect();
let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
if let Some(last_heatmap) = last_heatmap {
last_heatmap.hot_layers().map(|l| (&l.name, l)).collect()
last_heatmap.layers.iter().map(|l| (&l.name, l)).collect()
} else {
HashMap::new()
};

View File

@@ -42,7 +42,7 @@ pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(crate) timeline_id: TimelineId,
layers: Vec<HeatMapLayer>,
pub(crate) layers: Vec<HeatMapLayer>,
}
#[serde_as]
@@ -53,10 +53,8 @@ pub(crate) struct HeatMapLayer {
#[serde_as(as = "TimestampSeconds<i64>")]
pub(crate) access_time: SystemTime,
#[serde(default)]
pub(crate) cold: bool, // TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
@@ -64,13 +62,11 @@ impl HeatMapLayer {
name: LayerName,
metadata: LayerFileMetadata,
access_time: SystemTime,
cold: bool,
) -> Self {
Self {
name,
metadata,
access_time,
cold,
}
}
}
@@ -82,18 +78,6 @@ impl HeatMapTimeline {
layers,
}
}
pub(crate) fn into_hot_layers(self) -> impl Iterator<Item = HeatMapLayer> {
self.layers.into_iter().filter(|l| !l.cold)
}
pub(crate) fn hot_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
self.layers.iter().filter(|l| !l.cold)
}
pub(crate) fn all_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
self.layers.iter()
}
}
pub(crate) struct HeatMapStats {
@@ -108,7 +92,7 @@ impl HeatMapTenant {
layers: 0,
};
for timeline in &self.timelines {
for layer in timeline.hot_layers() {
for layer in &timeline.layers {
stats.layers += 1;
stats.bytes += layer.metadata.file_size;
}

View File

@@ -1563,10 +1563,10 @@ impl LayerInner {
self.access_stats.record_residence_event();
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
self.status.as_ref().unwrap().send_replace(Status::Evicted);
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
Ok(())
}

View File

@@ -99,8 +99,7 @@ use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate,
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_AMORTIZED_GLOBAL, LAYERS_PER_READ_BATCH_GLOBAL,
LAYERS_PER_READ_GLOBAL, ScanLatencyOngoingRecording, TimelineMetrics,
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL, ScanLatencyOngoingRecording, TimelineMetrics,
};
use crate::page_service::TenantManagerTypes;
use crate::pgdatadir_mapping::{
@@ -1331,6 +1330,10 @@ impl Timeline {
// (this is a requirement, not a bug). Skip updating the metric in these cases
// to avoid infinite results.
if !results.is_empty() {
// Record the total number of layers visited towards each key in the batch. While some
// layers may not intersect with a given read, and the cost of layer visits are
// amortized across the batch, each visited layer contributes directly to the observed
// latency for every read in the batch, which is what we care about.
if layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
static LOG_PACER: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
@@ -1345,23 +1348,9 @@ impl Timeline {
});
}
// Records the number of layers visited in a few different ways:
//
// * LAYERS_PER_READ: all layers count towards every read in the batch, because each
// layer directly affects its observed latency.
//
// * LAYERS_PER_READ_BATCH: all layers count towards each batch, to get the per-batch
// layer visits and access cost.
//
// * LAYERS_PER_READ_AMORTIZED: the average layer count per read, to get the amortized
// read amplification after batching.
let layers_visited = layers_visited as f64;
let avg_layers_visited = layers_visited / results.len() as f64;
LAYERS_PER_READ_BATCH_GLOBAL.observe(layers_visited);
for _ in &results {
self.metrics.layers_per_read.observe(layers_visited);
LAYERS_PER_READ_GLOBAL.observe(layers_visited);
LAYERS_PER_READ_AMORTIZED_GLOBAL.observe(avg_layers_visited);
self.metrics.layers_per_read.observe(layers_visited as f64);
LAYERS_PER_READ_GLOBAL.observe(layers_visited as f64);
}
}
@@ -3648,7 +3637,7 @@ impl Timeline {
let visible_non_resident = match previous_heatmap.as_deref() {
Some(PreviousHeatmap::Active {
heatmap, read_at, ..
}) => Some(heatmap.all_layers().filter_map(|hl| {
}) => Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
@@ -3664,7 +3653,7 @@ impl Timeline {
return None;
}
Some((desc, hl.metadata.clone(), hl.access_time, hl.cold))
Some((desc, hl.metadata.clone(), hl.access_time))
})),
Some(PreviousHeatmap::Obsolete) => None,
None => None,
@@ -3680,7 +3669,6 @@ impl Timeline {
layer.layer_desc().clone(),
layer.metadata(),
last_activity_ts,
false, // these layers are not cold
))
}
LayerVisibilityHint::Covered => {
@@ -3707,14 +3695,12 @@ impl Timeline {
// Sort layers in order of which to download first. For a large set of layers to download, we
// want to prioritize those layers which are most likely to still be in the resident many minutes
// or hours later:
// - Cold layers go last for convenience when a human inspects the heatmap.
// - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might
// only exist for a few minutes before being compacted into L1s.
// - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner
// the layer is likely to be covered by an image layer during compaction.
layers.sort_by_key(|(desc, _meta, _atime, cold)| {
layers.sort_by_key(|(desc, _meta, _atime)| {
std::cmp::Reverse((
*cold,
!LayerMap::is_l0(&desc.key_range, desc.is_delta),
desc.lsn_range.end,
))
@@ -3722,9 +3708,7 @@ impl Timeline {
let layers = layers
.into_iter()
.map(|(desc, meta, atime, cold)| {
HeatMapLayer::new(desc.layer_name(), meta, atime, cold)
})
.map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime))
.collect();
Some(HeatMapTimeline::new(self.timeline_id, layers))
@@ -3744,7 +3728,6 @@ impl Timeline {
name: vl.layer_desc().layer_name(),
metadata: vl.metadata(),
access_time: now,
cold: true,
};
heatmap_layers.push(hl);
}
@@ -7046,7 +7029,6 @@ mod tests {
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use std::iter::Iterator;
use tracing::Instrument;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -7060,8 +7042,8 @@ mod tests {
use crate::tenant::{PreviousHeatmap, Timeline};
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
assert_eq!(lhs.all_layers().count(), rhs.all_layers().count());
let lhs_rhs = lhs.all_layers().zip(rhs.all_layers());
assert_eq!(lhs.layers.len(), rhs.layers.len());
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
for (l, r) in lhs_rhs {
assert_eq!(l.name, r.name);
assert_eq!(l.metadata, r.metadata);
@@ -7139,11 +7121,10 @@ mod tests {
assert_eq!(heatmap.timeline_id, timeline.timeline_id);
// L0 should come last
let heatmap_layers = heatmap.all_layers().collect::<Vec<_>>();
assert_eq!(heatmap_layers.last().unwrap().name, l0_delta.layer_name());
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
let mut last_lsn = Lsn::MAX;
for layer in heatmap_layers {
for layer in &heatmap.layers {
// Covered layer should be omitted
assert!(layer.name != covered_delta.layer_name());
@@ -7272,7 +7253,7 @@ mod tests {
.expect("Infallible while timeline is not shut down");
// Both layers should be in the heatmap
assert!(heatmap.all_layers().count() > 0);
assert!(!heatmap.layers.is_empty());
// Now simulate a migration.
timeline
@@ -7298,7 +7279,7 @@ mod tests {
.await
.expect("Infallible while timeline is not shut down");
assert_eq!(post_eviction_heatmap.all_layers().count(), 0);
assert!(post_eviction_heatmap.layers.is_empty());
assert!(matches!(
timeline.previous_heatmap.load().as_deref(),
Some(PreviousHeatmap::Obsolete)

View File

@@ -7,7 +7,6 @@
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::Instant;
use super::layer_manager::LayerManager;
use super::{
@@ -20,7 +19,6 @@ use anyhow::{Context, anyhow};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
@@ -444,7 +442,6 @@ impl GcCompactionQueue {
));
};
let has_pending_tasks;
let mut yield_for_l0 = false;
let Some((id, item)) = ({
let mut guard = self.inner.lock().unwrap();
if let Some((id, item)) = guard.queued.pop_front() {
@@ -494,23 +491,13 @@ impl GcCompactionQueue {
let mut guard = self.inner.lock().unwrap();
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
}
let compaction_result =
timeline.compact_with_options(cancel, options, ctx).await?;
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
self.notify_and_unblock(id);
if compaction_result == CompactionOutcome::YieldForL0 {
yield_for_l0 = true;
}
}
}
GcCompactionQueueItem::SubCompactionJob(options) => {
// TODO: error handling, clear the queue if any task fails?
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
if compaction_result == CompactionOutcome::YieldForL0 {
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
// we need to clean things up before returning from the function.
yield_for_l0 = true;
}
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
}
GcCompactionQueueItem::Notify(id, l2_lsn) => {
self.notify_and_unblock(id);
@@ -539,10 +526,7 @@ impl GcCompactionQueue {
let mut guard = self.inner.lock().unwrap();
guard.running = None;
}
Ok(if yield_for_l0 {
tracing::info!("give up gc-compaction: yield for L0 compaction");
CompactionOutcome::YieldForL0
} else if has_pending_tasks {
Ok(if has_pending_tasks {
CompactionOutcome::Pending
} else {
CompactionOutcome::Done
@@ -740,41 +724,17 @@ struct CompactionStatisticsNumSize {
#[derive(Debug, Serialize, Default)]
pub struct CompactionStatistics {
/// Delta layer visited (maybe compressed, physical size)
delta_layer_visited: CompactionStatisticsNumSize,
/// Image layer visited (maybe compressed, physical size)
image_layer_visited: CompactionStatisticsNumSize,
/// Delta layer produced (maybe compressed, physical size)
delta_layer_produced: CompactionStatisticsNumSize,
/// Image layer produced (maybe compressed, physical size)
image_layer_produced: CompactionStatisticsNumSize,
/// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
delta_layer_discarded: CompactionStatisticsNumSize,
/// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
image_layer_discarded: CompactionStatisticsNumSize,
num_delta_layer_discarded: usize,
num_image_layer_discarded: usize,
num_unique_keys_visited: usize,
/// Delta visited (uncompressed, original size)
wal_keys_visited: CompactionStatisticsNumSize,
/// Image visited (uncompressed, original size)
image_keys_visited: CompactionStatisticsNumSize,
/// Delta produced (uncompressed, original size)
wal_produced: CompactionStatisticsNumSize,
/// Image produced (uncompressed, original size)
image_produced: CompactionStatisticsNumSize,
// Time spent in each phase
time_acquire_lock_secs: f64,
time_analyze_secs: f64,
time_download_layer_secs: f64,
time_main_loop_secs: f64,
time_final_phase_secs: f64,
time_total_secs: f64,
// Summary
/// Ratio of the key-value size before/after gc-compaction.
uncompressed_size_ratio: f64,
/// Ratio of the physical size before/after gc-compaction.
physical_size_ratio: f64,
}
impl CompactionStatistics {
@@ -824,13 +784,11 @@ impl CompactionStatistics {
self.image_produced.num += 1;
self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
}
fn discard_delta_layer(&mut self, original_size: u64) {
self.delta_layer_discarded.num += 1;
self.delta_layer_discarded.size += original_size;
fn discard_delta_layer(&mut self) {
self.num_delta_layer_discarded += 1;
}
fn discard_image_layer(&mut self, original_size: u64) {
self.image_layer_discarded.num += 1;
self.image_layer_discarded.size += original_size;
fn discard_image_layer(&mut self) {
self.num_image_layer_discarded += 1;
}
fn produce_delta_layer(&mut self, size: u64) {
self.delta_layer_produced.num += 1;
@@ -840,19 +798,6 @@ impl CompactionStatistics {
self.image_layer_produced.num += 1;
self.image_layer_produced.size += size;
}
fn finalize(&mut self) {
let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
self.uncompressed_size_ratio =
original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0
let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
let produced_physical_size = self.image_layer_produced.size
+ self.delta_layer_produced.size
+ self.image_layer_discarded.size
+ self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
self.physical_size_ratio =
original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
@@ -2613,7 +2558,7 @@ impl Timeline {
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<CompactionOutcome, CompactionError> {
) -> Result<(), CompactionError> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options.clone());
if sub_compaction {
@@ -2635,7 +2580,7 @@ impl Timeline {
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(CompactionOutcome::Done);
return Ok(());
}
self.compact_with_gc_inner(cancel, job, ctx).await
}
@@ -2645,14 +2590,11 @@ impl Timeline {
cancel: &CancellationToken,
job: GcCompactJob,
ctx: &RequestContext,
) -> Result<CompactionOutcome, CompactionError> {
) -> Result<(), CompactionError> {
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
let timer = Instant::now();
let begin_timer = timer;
let gc_lock = async {
tokio::select! {
guard = self.gc_lock.lock() => Ok(guard),
@@ -2660,9 +2602,6 @@ impl Timeline {
}
};
let time_acquire_lock = timer.elapsed();
let timer = Instant::now();
let gc_lock = crate::timed(
gc_lock,
"acquires gc lock",
@@ -2714,7 +2653,7 @@ impl Timeline {
tracing::warn!(
"no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
);
return Ok(CompactionOutcome::Skipped);
return Ok(());
}
real_gc_cutoff
} else {
@@ -2752,7 +2691,7 @@ impl Timeline {
"no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
gc_cutoff
);
return Ok(CompactionOutcome::Done);
return Ok(());
};
// Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
// the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
@@ -2773,7 +2712,7 @@ impl Timeline {
"no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
compact_lsn_range.end
);
return Ok(CompactionOutcome::Done);
return Ok(());
};
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
// layers to compact.
@@ -2799,7 +2738,7 @@ impl Timeline {
"no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
gc_cutoff, compact_key_range.start, compact_key_range.end
);
return Ok(CompactionOutcome::Done);
return Ok(());
}
retain_lsns_below_horizon.sort();
GcCompactionJobDescription {
@@ -2852,9 +2791,6 @@ impl Timeline {
has_data_below,
);
let time_analyze = timer.elapsed();
let timer = Instant::now();
for layer in &job_desc.selected_layers {
debug!("read layer: {}", layer.layer_desc().key());
}
@@ -2914,15 +2850,6 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
let resident_layer = layer
.download_and_keep_resident(ctx)
.await
@@ -2966,9 +2893,6 @@ impl Timeline {
.context("failed to create filter iterator")
.map_err(CompactionError::Other)?;
let time_download_layer = timer.elapsed();
let timer = Instant::now();
// Step 2: Produce images+deltas.
let mut accumulated_values = Vec::new();
let mut last_key: Option<Key> = None;
@@ -3043,8 +2967,6 @@ impl Timeline {
// the key and LSN range are determined. However, to keep things simple here, we still
// create this writer, and discard the writer in the end.
let mut keys_processed = 0;
while let Some(((key, lsn, val), desc)) = merge_iter
.next_with_trace()
.await
@@ -3054,18 +2976,6 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
}
if self.shard_identity.is_key_disposable(&key) {
// If this shard does not need to store this key, simply skip it.
//
@@ -3204,9 +3114,6 @@ impl Timeline {
.map_err(CompactionError::Other)?;
// end: move the above part to the loop body
let time_main_loop = timer.elapsed();
let timer = Instant::now();
let mut rewrote_delta_layers = Vec::new();
for (key, writers) in delta_layer_rewriters {
if let Some(delta_writer_before) = writers.before {
@@ -3271,13 +3178,6 @@ impl Timeline {
let mut keep_layers = HashSet::new();
let produced_delta_layers_len = produced_delta_layers.len();
let produced_image_layers_len = produced_image_layers.len();
let layer_selection_by_key = job_desc
.selected_layers
.iter()
.map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
.collect::<HashMap<_, _>>();
for action in produced_delta_layers {
match action {
BatchWriterResult::Produced(layer) => {
@@ -3291,16 +3191,8 @@ impl Timeline {
if cfg!(debug_assertions) {
info!("discarded delta layer: {}", l);
}
if let Some(layer_desc) = layer_selection_by_key.get(&l) {
stat.discard_delta_layer(layer_desc.file_size());
} else {
tracing::warn!(
"discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
l
);
stat.discard_delta_layer(0);
}
keep_layers.insert(l);
stat.discard_delta_layer();
}
}
}
@@ -3309,9 +3201,6 @@ impl Timeline {
"produced rewritten delta layer: {}",
layer.layer_desc().key()
);
// For now, we include rewritten delta layer size in the "produce_delta_layer". We could
// make it a separate statistics in the future.
stat.produce_delta_layer(layer.layer_desc().file_size());
}
compact_to.extend(rewrote_delta_layers);
for action in produced_image_layers {
@@ -3323,16 +3212,8 @@ impl Timeline {
}
BatchWriterResult::Discarded(l) => {
debug!("discarded image layer: {}", l);
if let Some(layer_desc) = layer_selection_by_key.get(&l) {
stat.discard_image_layer(layer_desc.file_size());
} else {
tracing::warn!(
"discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
l
);
stat.discard_image_layer(0);
}
keep_layers.insert(l);
stat.discard_image_layer();
}
}
}
@@ -3380,16 +3261,6 @@ impl Timeline {
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
let time_final_phase = timer.elapsed();
stat.time_final_phase_secs = time_final_phase.as_secs_f64();
stat.time_main_loop_secs = time_main_loop.as_secs_f64();
stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
stat.time_download_layer_secs = time_download_layer.as_secs_f64();
stat.time_analyze_secs = time_analyze.as_secs_f64();
stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
stat.finalize();
info!(
"gc-compaction statistics: {}",
serde_json::to_string(&stat)
@@ -3398,7 +3269,7 @@ impl Timeline {
);
if dry_run {
return Ok(CompactionOutcome::Done);
return Ok(());
}
info!(
@@ -3517,7 +3388,7 @@ impl Timeline {
drop(gc_lock);
Ok(CompactionOutcome::Done)
Ok(())
}
}

View File

@@ -61,11 +61,11 @@ impl HeatmapLayersDownloader {
tracing::info!(
resident_size=%timeline.resident_physical_size(),
heatmap_layers=%heatmap.all_layers().count(),
heatmap_layers=%heatmap.layers.len(),
"Starting heatmap layers download"
);
let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|layer| {
let ctx = ctx.attached_child();
let tl = timeline.clone();

8
poetry.lock generated
View File

@@ -1414,14 +1414,14 @@ files = [
[[package]]
name = "jinja2"
version = "3.1.6"
version = "3.1.5"
description = "A very fast and expressive template engine."
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"},
{file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"},
{file = "jinja2-3.1.5-py3-none-any.whl", hash = "sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb"},
{file = "jinja2-3.1.5.tar.gz", hash = "sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb"},
]
[package.dependencies]
@@ -3820,4 +3820,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "010ffce959bb256880ab5a267048c182e4612b3151f9a94e3bf5d3a7807962fe"
content-hash = "9711c5479c867fa614ce3d352f1bbc63dba1cb2376d347f96fbeda6f512ee308"

View File

@@ -286,16 +286,17 @@ where
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let csid = self.callsite_id(attrs.metadata().callsite());
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
let mut exts = span.extensions_mut();
exts.insert(fields);
exts.insert(csid);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
@@ -564,13 +565,6 @@ impl EventFormatter {
)?;
}
let spans = SerializableSpans {
ctx,
callsite_ids,
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
};
serializer.serialize_entry("spans", &spans)?;
// TODO: thread-local cache?
let pid = std::process::id();
// Skip adding pid 1 to reduce noise for services running in containers.
@@ -620,9 +614,15 @@ impl EventFormatter {
}
}
if spans.extract.has_values() {
// TODO: add fields from event, too?
serializer.serialize_entry("extract", &spans.extract)?;
let stack = SerializableSpans {
ctx,
callsite_ids,
fields: ExtractedSpanFields::<'_, F>::new(extract_fields),
};
serializer.serialize_entry("spans", &stack)?;
if stack.fields.has_values() {
serializer.serialize_entry("extract", &stack.fields)?;
}
serializer.end()
@@ -911,7 +911,7 @@ where
{
ctx: &'a Context<'ctx, Span>,
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
extract: ExtractedSpanFields<'a, F>,
fields: ExtractedSpanFields<'a, F>,
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
@@ -940,7 +940,7 @@ where
serializer.serialize_value(&SerializableSpanFields {
span: &span,
extract: &self.extract,
fields: &self.fields,
})?;
}
}
@@ -955,7 +955,7 @@ where
Span: for<'lookup> LookupSpan<'lookup>,
{
span: &'a SpanRef<'span, Span>,
extract: &'a ExtractedSpanFields<'a, F>,
fields: &'a ExtractedSpanFields<'a, F>,
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
@@ -973,7 +973,7 @@ where
for (name, value) in &data.fields.pin() {
serializer.serialize_entry(name, value)?;
// TODO: replace clone with reference, if possible.
self.extract.set(name, value.clone());
self.fields.set(name, value.clone());
}
}

View File

@@ -13,7 +13,7 @@ requests = "^2.32.3"
pytest-xdist = "^3.3.1"
asyncpg = "^0.30.0"
aiopg = "^1.4.0"
Jinja2 = "^3.1.6"
Jinja2 = "^3.1.5"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.20241019"
boto3 = "^1.34.11"

View File

@@ -430,10 +430,7 @@ impl InterpretedWalReader {
.with_context(|| "Failed to interpret WAL")?;
for (shard, record) in interpreted {
// Shard zero needs to track the start LSN of the latest record
// in adition to the LSN of the next record to ingest. The former
// is included in basebackup persisted by the compute in WAL.
if !shard.is_shard_zero() && record.is_empty() {
if record.is_empty() {
continue;
}
@@ -743,7 +740,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
.await
.unwrap();
let end_pos = end_watch.get();
@@ -886,16 +883,10 @@ mod tests {
let resident_tli = tli.wal_residence_guard().await.unwrap();
let mut next_record_lsns = Vec::default();
let end_watch = Env::write_wal(
tli,
start_lsn,
SIZE,
MSG_COUNT,
c"neon-file:",
Some(&mut next_record_lsns),
)
.await
.unwrap();
let end_watch =
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
.await
.unwrap();
let end_pos = end_watch.get();
let streaming_wal_reader = StreamingWalReader::new(
@@ -1036,16 +1027,10 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(
tli,
start_lsn,
SIZE,
MSG_COUNT,
c"neon-file:",
Some(&mut next_record_lsns),
)
.await
.unwrap();
let end_watch =
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
.await
.unwrap();
assert!(next_record_lsns.len() > 3);
let shard_0_start_lsn = next_record_lsns[3];
@@ -1139,88 +1124,4 @@ mod tests {
}
}
}
#[tokio::test]
async fn test_shard_zero_does_not_skip_empty_records() {
let _ = env_logger::builder().is_test(true).try_init();
const SIZE: usize = 8 * 1024;
const MSG_COUNT: usize = 10;
const PG_VERSION: u32 = 17;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
let env = Env::new(true).unwrap();
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
.await
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let mut next_record_lsns = Vec::new();
let end_watch = Env::write_wal(
tli,
start_lsn,
SIZE,
MSG_COUNT,
// This is a logical message prefix that is not persisted to key value storage.
// We use it in order to validate that shard zero receives emtpy interpreted records.
c"test:",
Some(&mut next_record_lsns),
)
.await
.unwrap();
let end_pos = end_watch.get();
let streaming_wal_reader = StreamingWalReader::new(
resident_tli,
None,
start_lsn,
end_pos,
end_watch,
MAX_SEND_SIZE,
);
let shard = ShardIdentity::unsharded();
let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
let handle = InterpretedWalReader::spawn(
streaming_wal_reader,
start_lsn,
records_tx,
shard,
PG_VERSION,
&Some("pageserver".to_string()),
);
let mut interpreted_records = Vec::new();
while let Some(batch) = records_rx.recv().await {
interpreted_records.push(batch.records);
if batch.wal_end_lsn == batch.available_wal_end_lsn {
break;
}
}
let received_next_record_lsns = interpreted_records
.into_iter()
.flat_map(|b| b.records)
.map(|rec| rec.next_record_lsn)
.collect::<Vec<_>>();
// By default this also includes the start LSN. Trim it since it shouldn't be received.
let next_record_lsns = next_record_lsns.into_iter().skip(1).collect::<Vec<_>>();
assert_eq!(received_next_record_lsns, next_record_lsns);
handle.abort();
let mut done = false;
for _ in 0..5 {
if handle.current_position().is_none() {
done = true;
break;
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
assert!(done);
}
}

View File

@@ -1,4 +1,3 @@
use std::ffi::CStr;
use std::sync::Arc;
use camino_tempfile::Utf8TempDir;
@@ -125,7 +124,6 @@ impl Env {
start_lsn: Lsn,
msg_size: usize,
msg_count: usize,
prefix: &CStr,
mut next_record_lsns: Option<&mut Vec<Lsn>>,
) -> anyhow::Result<EndWatch> {
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
@@ -135,6 +133,7 @@ impl Env {
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
let prefix = c"neon-file:";
let prefixlen = prefix.to_bytes_with_nul().len();
assert!(msg_size >= prefixlen);
let message = vec![0; msg_size - prefixlen];

View File

@@ -246,7 +246,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
.await
.unwrap();
let end_pos = end_watch.get();

View File

@@ -1174,6 +1174,15 @@ class NeonEnv:
"max_batch_size": 32,
}
if config.test_may_use_compatibility_snapshot_binaries:
log.info(
"Skipping prev heatmap settings to avoid forward-compatibility related test failures"
)
else:
# Look for gaps in WAL received from safekeepeers
ps_cfg["load_previous_heatmap"] = True
ps_cfg["generate_unarchival_heatmap"] = True
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if get_vectored_concurrent_io is not None:
ps_cfg["get_vectored_concurrent_io"] = {

View File

@@ -375,19 +375,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
self.verbose_error(res)
def timeline_patch_index_part(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
data: dict[str, Any],
):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/patch_index_part",
json=data,
)
self.verbose_error(res)
return res.json()
def tenant_location_conf(
self,
tenant_id: TenantId | TenantShardId,

View File

@@ -5,59 +5,34 @@ import logging
import requests
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
TEST_ROLE_NAMES = [
{"name": "neondb_owner"},
{"name": "role with spaces"},
{"name": "role with%20spaces "},
{"name": "role with whitespaces "},
{"name": "injective role with spaces'; SELECT pg_sleep(1000);"},
{"name": "role with #pound-sign and &ampersands=true"},
{"name": "role with emoji 🌍"},
{"name": "role \";with ';injections $$ $x$ $ %I !/\\&#@"},
{"name": '"role in double quotes"'},
{"name": "'role in single quotes'"},
]
TEST_DB_NAMES = [
{
"name": "neondb",
"owner": "neondb_owner",
"owner": "cloud_admin",
},
{
"name": "db with spaces",
"owner": "role with spaces",
"owner": "cloud_admin",
},
{
"name": "db with%20spaces ",
"owner": "role with%20spaces ",
"owner": "cloud_admin",
},
{
"name": "db with whitespaces ",
"owner": "role with whitespaces ",
"owner": "cloud_admin",
},
{
"name": "injective db with spaces'; SELECT pg_sleep(1000);",
"owner": "injective role with spaces'; SELECT pg_sleep(1000);",
"name": "injective db with spaces'; SELECT pg_sleep(10);",
"owner": "cloud_admin",
},
{
"name": "db with #pound-sign and &ampersands=true",
"owner": "role with #pound-sign and &ampersands=true",
"owner": "cloud_admin",
},
{
"name": "db with emoji 🌍",
"owner": "role with emoji 🌍",
},
{
"name": "db \";with ';injections $$ $x$ $ %I !/\\&#@",
"owner": "role \";with ';injections $$ $x$ $ %I !/\\&#@",
},
{
"name": '"db in double quotes"',
"owner": '"role in double quotes"',
},
{
"name": "'db in single quotes'",
"owner": "'role in single quotes'",
"owner": "cloud_admin",
},
]
@@ -77,7 +52,6 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
},
}
@@ -125,10 +99,10 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
), f"Expected 404 status code, but got {e.response.status_code}"
def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
def test_compute_create_databases(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can create and work with databases and roles
with special characters (whitespaces, %, tabs, etc.) in the name.
Test that compute_ctl can create and work with databases with special
characters (whitespaces, %, tabs, etc.) in the name.
"""
env = neon_simple_env
@@ -142,7 +116,6 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
},
}
@@ -166,43 +139,6 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
assert len(curr_db) == 1
assert curr_db[0] == db["name"]
for role in TEST_ROLE_NAMES:
with endpoint.cursor() as cursor:
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s", (role["name"],))
catalog_role = cursor.fetchone()
assert catalog_role is not None
assert catalog_role[0] == role["name"]
delta_operations = []
for db in TEST_DB_NAMES:
delta_operations.append({"action": "delete_db", "name": db["name"]})
for role in TEST_ROLE_NAMES:
delta_operations.append({"action": "delete_role", "name": role["name"]})
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [],
"databases": [],
},
"delta_operations": delta_operations,
}
)
endpoint.reconfigure()
for db in TEST_DB_NAMES:
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", (db["name"],))
catalog_db = cursor.fetchone()
assert catalog_db is None
for role in TEST_ROLE_NAMES:
with endpoint.cursor() as cursor:
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s", (role["name"],))
catalog_role = cursor.fetchone()
assert catalog_role is None
def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
"""
@@ -214,19 +150,17 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
# stuff into the spec.json file.
endpoint = env.endpoints.create_start("main")
SUB_DB_NAME = "';subscriber_db $$ $x$ $;"
PUB_DB_NAME = "publisher_db"
TEST_DB_NAMES = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": SUB_DB_NAME,
"name": "subscriber_db",
"owner": "cloud_admin",
},
{
"name": PUB_DB_NAME,
"name": "publisher_db",
"owner": "cloud_admin",
},
]
@@ -243,47 +177,47 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
)
endpoint.reconfigure()
# Connect to the PUB_DB_NAME and create a publication
with endpoint.cursor(dbname=PUB_DB_NAME) as cursor:
# connect to the publisher_db and create a publication
with endpoint.cursor(dbname="publisher_db") as cursor:
cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES")
cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');")
cursor.execute("CREATE TABLE t(a int)")
cursor.execute("INSERT INTO t VALUES (1)")
cursor.execute("CHECKPOINT")
# Connect to the SUB_DB_NAME and create a subscription
# Note that we need to create subscription with the following connstr:
connstr = endpoint.connstr(dbname=PUB_DB_NAME).replace("'", "''")
with endpoint.cursor(dbname=SUB_DB_NAME) as cursor:
# connect to the subscriber_db and create a subscription
# Note that we need to create subscription with
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
with endpoint.cursor(dbname="subscriber_db") as cursor:
cursor.execute("CREATE TABLE t(a int)")
cursor.execute(
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
)
# Wait for the subscription to be active
# wait for the subscription to be active
logical_replication_sync(
endpoint,
endpoint,
"mysub",
sub_dbname=SUB_DB_NAME,
pub_dbname=PUB_DB_NAME,
sub_dbname="subscriber_db",
pub_dbname="publisher_db",
)
# Check that replication is working
with endpoint.cursor(dbname=SUB_DB_NAME) as cursor:
with endpoint.cursor(dbname="subscriber_db") as cursor:
cursor.execute("SELECT * FROM t")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == 1
# Drop the SUB_DB_NAME from the list
# drop the subscriber_db from the list
TEST_DB_NAMES_NEW = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": PUB_DB_NAME,
"name": "publisher_db",
"owner": "cloud_admin",
},
]
@@ -296,7 +230,7 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
"databases": TEST_DB_NAMES_NEW,
},
"delta_operations": [
{"action": "delete_db", "name": SUB_DB_NAME},
{"action": "delete_db", "name": "subscriber_db"},
# also test the case when we try to delete a non-existent database
# shouldn't happen in normal operation,
# but can occur when failed operations are retried
@@ -305,22 +239,22 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
}
)
logging.info(f"Reconfiguring the endpoint to drop the {SUB_DB_NAME} database")
logging.info("Reconfiguring the endpoint to drop the subscriber_db")
endpoint.reconfigure()
# Check that the SUB_DB_NAME is dropped
# Check that the subscriber_db is dropped
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", (SUB_DB_NAME,))
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",))
catalog_db = cursor.fetchone()
assert catalog_db is None
# Check that we can still connect to the PUB_DB_NAME
with endpoint.cursor(dbname=PUB_DB_NAME) as cursor:
# Check that we can still connect to the publisher_db
with endpoint.cursor(dbname="publisher_db") as cursor:
cursor.execute("SELECT * FROM current_database()")
curr_db = cursor.fetchone()
assert curr_db is not None
assert len(curr_db) == 1
assert curr_db[0] == PUB_DB_NAME
assert curr_db[0] == "publisher_db"
def test_compute_drop_role(neon_simple_env: NeonEnv):
@@ -331,7 +265,6 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
"""
env = neon_simple_env
TEST_DB_NAME = "db_with_permissions"
TEST_GRANTEE = "'); MALFORMED SQL $$ $x$ $/;5%$ %I"
endpoint = env.endpoints.create_start("main")
@@ -368,18 +301,16 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
cursor.execute("create view test_view as select * from test_table")
with endpoint.cursor(dbname=TEST_DB_NAME, user="neon") as cursor:
cursor.execute(f'create role "{TEST_GRANTEE}"')
cursor.execute("create role readonly")
# We (`compute_ctl`) make 'neon' the owner of schema 'public' in the owned database.
# Postgres has all sorts of permissions and grants that we may not handle well,
# but this is the shortest repro grant for the issue
# https://github.com/neondatabase/cloud/issues/13582
cursor.execute(f'grant select on all tables in schema public to "{TEST_GRANTEE}"')
cursor.execute("grant select on all tables in schema public to readonly")
# Check that role was created
with endpoint.cursor() as cursor:
cursor.execute(
"SELECT rolname FROM pg_roles WHERE rolname = %(role)s", {"role": TEST_GRANTEE}
)
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = 'readonly'")
role = cursor.fetchone()
assert role is not None
@@ -387,8 +318,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
# that may block our ability to drop the role.
with endpoint.cursor(dbname=TEST_DB_NAME) as cursor:
cursor.execute(
"select grantor from information_schema.role_table_grants where grantee = %(grantee)s",
{"grantee": TEST_GRANTEE},
"select grantor from information_schema.role_table_grants where grantee = 'readonly'"
)
res = cursor.fetchall()
assert len(res) == 2, f"Expected 2 table grants, got {len(res)}"
@@ -402,7 +332,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
"delta_operations": [
{
"action": "delete_role",
"name": TEST_GRANTEE,
"name": "readonly",
},
],
}
@@ -411,9 +341,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
# Check that role is dropped
with endpoint.cursor() as cursor:
cursor.execute(
"SELECT rolname FROM pg_roles WHERE rolname = %(role)s", {"role": TEST_GRANTEE}
)
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = 'readonly'")
role = cursor.fetchone()
assert role is None

View File

@@ -7,7 +7,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.utils import run_only_on_default_postgres, wait_until
from fixtures.utils import wait_until
def check_client(env: NeonEnv, client: PageserverHttpClient):
@@ -138,25 +138,3 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde
with env.pageserver.http_client(auth_token=pageserver_token) as client:
check_client(env, client)
@run_only_on_default_postgres("it does not use any postgres functionality")
def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
with env.pageserver.http_client() as client:
client.timeline_patch_index_part(
tenant_id,
timeline_id,
{"rel_size_migration": "migrating"},
)
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "migrating"
# This is invalid in practice: we should never rollback the migrating state to legacy.
# But we do it here to test the API.
client.timeline_patch_index_part(
tenant_id,
timeline_id,
{"rel_size_migration": "legacy"},
)
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy"

View File

@@ -955,17 +955,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
raise RuntimeError(f"No heatmap for timeline: {tlid}")
def count_timeline_heatmap_layers(tlid) -> tuple[int, int]:
cold, hot = 0, 0
layers = timeline_heatmap(tlid)["layers"]
for layer in layers:
if layer["cold"]:
cold += 1
else:
hot += 1
return cold, hot
env.storage_controller.allowed_errors.extend(
[
".*Timed out.*downloading layers.*",
@@ -999,19 +988,13 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
)
def all_layers_downloaded(node, expected_layer_count: int):
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
def all_layers_downloaded(expected_layer_count: int):
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
assert local_layers_count >= expected_layer_count
def no_layers_downloaded(node):
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
assert local_layers_count == 0
wait_until(lambda: all_layers_downloaded(ps_secondary, after_migration_heatmap_layers_count))
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
# Read everything and make sure that we're not downloading anything extra.
# All hot layers should be available locally now.
@@ -1064,35 +1047,13 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id))
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}")
log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}")
parent_cold, parent_hot = count_timeline_heatmap_layers(timeline_id)
child_cold, child_hot = count_timeline_heatmap_layers(child_timeline_id)
log.info(f"Parent timeline heatmap size: cold={parent_cold}, hot={parent_hot}")
log.info(f"Child timeline heatmap size: cold={child_cold}, hot={child_hot}")
# All layers in the heatmap should come from the generation on unarchival.
# Hence, they should be cold.
assert parent_cold > 0
assert parent_hot == 0
expected_locally = parent_cold
expected_locally = len(timeline_heatmap(timeline_id)["layers"])
assert expected_locally > 0
env.storage_controller.download_heatmap_layers(
TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True
)
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")])
# The uploaded heatmap is still empty. Clean up all layers on the secondary.
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
wait_until(lambda: no_layers_downloaded(ps_attached))
# Upload a new heatmap. The previously cold layers become hot since they're now resident.
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
# Warm up the current secondary.
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
wait_until(lambda: all_layers_downloaded(expected_locally))