Compare commits

..

115 Commits

Author SHA1 Message Date
github-actions[bot]
c1de242c1c Storage release 2025-07-18 06:11 UTC 2025-07-18 06:11:53 +00:00
Vlad Lazar
88cf0c04d5 Storage release 2025-07-11 06:11 UTC 2025-07-11 15:04:44 +01:00
Vlad Lazar
c19e1e76ac pageserver: log only on final shard resolution failure (#12565)
This log is too noisy. Instead of warning on every retry, let's log only
on the final failure.
2025-07-11 15:04:25 +01:00
github-actions[bot]
a72eaf701e Storage release 2025-07-04 06:11 UTC 2025-07-04 06:11:44 +00:00
Vlad Lazar
8693b85986 Storage release 2025-07-01 07:57 UTC 2025-07-01 09:57:07 +02:00
Arpad Müller
29cf6a36c2 detach_ancestor: delete the right layer when hardlink fails (#12397)
If a hardlink operation inside `detach_ancestor` fails due to the layer
already existing, we delete the layer to make sure the source is one we
know about, and then retry.

But we deleted the wrong file, namely, the one we wanted to use as the
source of the hardlink. As a result, the follow up hard link operation
failed. Our PR corrects this mistake.
2025-07-01 09:57:07 +02:00
Vlad Lazar
4beca2e639 Storage release 2025-06-27 06:11 UTC 2025-06-27 17:37:32 +02:00
Arpad Müller
103c866a90 Fix hang deleting offloaded timelines (#12366)
We don't have cancellation support for timeline deletions. In other
words, timeline deletion might still go on in an older generation while
we are attaching it in a newer generation already, because the
cancellation simply hasn't reached the deletion code.

This has caused us to hit a situation with offloaded timelines in which
the timeline was in an unrecoverable state: always returning an accepted
response, but never a 404 like it should be.

The detailed description can be found in
[here](https://github.com/neondatabase/cloud/issues/30406#issuecomment-3008667859)
(private repo link).

TLDR:

1. we ask to delete timeline on old pageserver/generation, starts
process in background
2. the storcon migrates the tenant to a different pageserver.
- during attach, the pageserver still finds an index part, so it adds it
to `offloaded_timelines`
4. the timeline deletion finishes, removing the index part in S3
5. there is a retry of the timeline deletion endpoint, sent to the new
pageserver location. it is bound to fail however:
- as the index part is gone, we print `Timeline already deleted in
remote storage`.
- the problem is that we then return an accepted response code, and not
a 404.
- this confuses the code calling us. it thinks the timeline is not
deleted, so keeps retrying.
- this state never gets recovered from until a reset/detach, because of
the `offloaded_timelines` entry staying there.

This is where this PR fixes things: if no index part can be found, we
can safely assume that the timeline is gone in S3 (it's the last thing
to be deleted), so we can remove it from `offloaded_timelines` and
trigger a reupload of the manifest. Subsequent retries will pick that
up.

Why not improve the cancellation support? It is a more disruptive code
change, that might have its own risks. So we don't do it for now.

Fixes https://github.com/neondatabase/cloud/issues/30406
2025-06-27 17:36:01 +02:00
github-actions[bot]
38c73bea87 Storage release 2025-06-20 12:32 UTC 2025-06-20 12:32:41 +00:00
github-actions[bot]
345662cbc2 Storage release 2025-06-20 06:11 UTC 2025-06-20 06:11:33 +00:00
github-actions[bot]
e23726f31c Storage release 2025-06-13 07:48 UTC 2025-06-13 07:48:37 +00:00
Alex Chi Z
e52313d216 Storage release 2025-06-06 10:55 UTC 2025-06-06 22:23:10 +08:00
Conrad Ludgate
6585f71137 [proxy] separate compute connect from compute authentication (#12145)
## Problem

PGLB/Neonkeeper needs to separate the concerns of connecting to compute,
and authenticating to compute.

Additionally, the code within `connect_to_compute` is rather messy,
spending effort on recovering the authentication info after
wake_compute.

## Summary of changes

Split `ConnCfg` into `ConnectInfo` and `AuthInfo`. `wake_compute` only
returns `ConnectInfo` and `AuthInfo` is determined separately from the
`handshake`/`authenticate` process.

Additionally, `ConnectInfo::connect_raw` is in-charge or establishing
the TLS connection, and the `postgres_client::Config::connect_raw` is
configured to use `NoTls` which will force it to skip the TLS
negotiation. This should just work.
2025-06-06 22:23:01 +08:00
Alexander Sarantcev
765b76f4cd storcon: Introduce deletion tombstones to support flaky node scenario (#12096)
## Problem

Removed nodes can re-add themselves on restart if not properly
tombstoned. We need a mechanism (e.g. soft-delete flag) to prevent this,
especially in cases where the node is unreachable.

More details there: #12036

## Summary of changes

- Introduced `NodeLifecycle` enum to represent node lifecycle states.
- Added a string representation of `NodeLifecycle` to the `nodes` table.
- Implemented node removal using a tombstone mechanism.
- Introduced `/debug/v1/tombstone*` handlers to manage the tombstone
state.
2025-06-06 22:23:01 +08:00
Erik Grinaker
72b09473c1 pageserver: move spawn_grpc to GrpcPageServiceHandler::spawn (#12147)
Mechanical move, no logic changes.
2025-06-06 22:23:01 +08:00
Alex Chi Z.
72a6d668b5 feat(build): add aws cli into the docker image (#12161)
## Problem

Makes it easier to debug AWS permission issues (i.e., storage scrubber)

## Summary of changes

Install awscliv2 into the docker image.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-06 22:23:01 +08:00
Alex Chi Z.
b5d7296e04 test(pageserver): ensure offload cleans up metrics (#12127)
Add a test to ensure timeline metrics are fully cleaned up after
offloading.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-06 22:23:01 +08:00
Arpad Müller
854de6d221 neon_local timeline import: create timelines on safekeepers (#12138)
neon_local's timeline import subcommand creates timelines manually, but
doesn't create them on the safekeepers. If a test then tries to open an
endpoint to read from the timeline, it will error in the new world with
`--timelines-onto-safekeepers`.

Therefore, if that flag is enabled, create the timelines on the
safekeepers.

Note that this import functionality is different from the fast import
feature (https://github.com/neondatabase/neon/issues/10188, #11801).

Part of #11670
As well as part of #11712
2025-06-06 22:23:01 +08:00
Alexey Kondratov
8dee1a7d0f feat(compute_ctl): Implement graceful compute monitor exit (#11911)
## Problem

After introducing a naive downtime calculation for the Postgres process
inside compute in https://github.com/neondatabase/neon/pull/11346, I
noticed that some amount of computes regularly report short downtime.
After checking some particular cases, it looks like all of them report
downtime close to the end of the life of the compute, i.e., when the
control plane calls a `/terminate` and we are waiting for Postgres to
exit.

Compute monitor also produces a lot of error logs because Postgres stops
accepting connections, but it's expected during the termination process.

## Summary of changes

Regularly check the compute status inside the main compute monitor loop
and exit gracefully when the compute is in some terminal or
soon-to-be-terminal state.

---------

Co-authored-by: Tristan Partin <tristan@neon.tech>
2025-06-06 22:23:01 +08:00
Dmitrii Kovalkov
b232c18441 pageserver, tests: prepare test_basebackup_cache for --timelines-onto-safekeepers (#12143)
## Problem
- `test_basebackup_cache` fails in
https://github.com/neondatabase/neon/pull/11712 because after the
timelines on safekeepers are managed by storage controller, they do
contain proper start_lsn and the compute_ctl tool sends the first
basebackup request with this LSN.
- `Failed to prepare basebackup` log messages during timeline
initialization, because the timeline is not yet in the global timeline
map.

- Relates to https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Account for `timeline_onto_safekeepers` storcon's option in the test.
- Do not trigger basebackup prepare during the timeline initialization.
2025-06-06 22:23:01 +08:00
a-masterov
f25da470c7 Configure the dynamic loader for the extension-tests image (#12142)
## Problem
The same problem, fixed in
https://github.com/neondatabase/neon/issues/11857, but for the image
`neon-extesions-test`
## Summary of changes
The config file was added to use our library
2025-06-06 22:23:01 +08:00
Erik Grinaker
7f1f5c8487 pagebench: add batch support (#12133)
## Problem

The new gRPC page service protocol supports client-side batches. The
current libpq protocol only does best-effort server-side batching.

To compare these approaches, Pagebench should support submitting
contiguous page batches, similar to how Postgres will submit them (e.g.
with prefetches or vectored reads).

## Summary of changes

Add a `--batch-size` parameter specifying the size of contiguous page
batches. One batch counts as 1 RPS and 1 queue depth.

For the libpq protocol, a batch is submitted as individual requests and
we rely on the server to batch them for us. This will give a realistic
comparison of how these would be processed in the wild (e.g. when
Postgres sends 100 prefetch requests).

This patch also adds some basic validation of responses.
2025-06-06 22:23:01 +08:00
Vlad Lazar
c01591ce61 pageserver: remove handling of vanilla protocol (#12126)
## Problem

We support two ingest protocols on the pageserver: vanilla and
interpreted.
Interpreted has been the only protocol in use for a long time.

## Summary of changes

* Remove the ingest handling of the vanilla protocol
* Remove tenant and pageserver configuration for it
* Update all tests that tweaked the ingest protocol

## Compatibility

Backward compatibility:
* The new pageserver version can read the existing pageserver
configuration and it will ignore the unknown field.
* When the tenant config is read from the storcon db or from the
pageserver disk, the extra field will be ignored.

Forward compatiblity:
* Both the pageserver config and the tenant config map missing fields to
their default value.

I'm not aware of any tenant level override that was made for this knob.
2025-06-06 22:23:01 +08:00
Konstantin Knizhnik
bf1e33b062 Replica promote (#12090)
## Problem

This PR is part of larger computes support activity:

https://www.notion.so/neondatabase/Larger-computes-114f189e00478080ba01e8651ab7da90

Epic: https://github.com/neondatabase/cloud/issues/19010

In case of planned node restart, we are going to 
1. create new read-only replica
2. capture LFC state at primary
3. use this state to prewarm replica
4. stop old primary
5. promote replica to primary

Steps 1-3 are currently implemented and support from compute side.
This PR provides compute level implementation of replica promotion.

Support replica promotion

## Summary of changes

Right now replica promotion is done in three steps:
1. Set safekeepers list (now it is empty for replica)
2. Call `pg_promote()` top promote replica
3. Update endpoint setting to that it ids not more treated as replica.

May be all this three steps should be done by some function in
compute_ctl. But right now this logic is only implement5ed in test.

Postgres submodules PRs:
https://github.com/neondatabase/postgres/pull/648
https://github.com/neondatabase/postgres/pull/649
https://github.com/neondatabase/postgres/pull/650
https://github.com/neondatabase/postgres/pull/651

---------

Co-authored-by: Matthias van de Meent <matthias@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-06 22:23:01 +08:00
Konstantin Knizhnik
62bbd2f723 Add query execution time histogram (#10050)
## Problem


It will be useful to understand what kind of queries our clients are
executed.
And one of the most important characteristic of query is query execution
time - at least it allows to distinguish OLAP and OLTP queries. Also
monitoring query execution time can help to detect problem with
performance (assuming that workload is more or less stable).

## Summary of changes

Add query execution time histogram.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-06 22:23:01 +08:00
Folke Behrens
3ca47bc37b proxy: Move PGLB-related modules into pglb root module. (#12144)
Split the modules responsible for passing data and connecting to compute
from auth and waking for PGLB.
This PR just moves files. The waking is going to get removed from pglb
after this.
2025-06-06 22:23:01 +08:00
Alex Chi Z.
852c210d69 feat(pageserver): report tenant properties to posthog (#12113)
## Problem

Part of https://github.com/neondatabase/neon/issues/11813

In PostHog UI, we need to create the properties before using them as a
filter. We report all variants automatically when we start the
pageserver. In the future, we can report all real tenants instead of
fake tenants (we do that now to save money + we don't need real tenants
in the UI).

## Summary of changes

* Collect `region`, `availability_zone`, `pageserver_id` properties and
use them in the feature evaluation.
* Report 10 fake tenants on each pageserver startup.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-06 22:23:01 +08:00
Conrad Ludgate
3c6a1f6a81 update proxy protocol parsing to not a rw wrapper (#12035)
## Problem

I believe in all environments we now specify either required/rejected
for proxy-protocol V2 as required. We no longer rely on the supported
flow. This means we no longer need to keep around read bytes incase
they're not in a header.

While I designed ChainRW to be fast (the hot path with an empty buffer
is very easy to branch predict), it's still unnecessary.

## Summary of changes

* Remove the ChainRW wrapper
* Refactor how we read the proxy-protocol header using read_exact.
Slightly worse perf but it's hardly significant.
* Don't try and parse the header if it's rejected.
2025-06-06 22:23:01 +08:00
Konstantin Knizhnik
d49d781689 Update online_advisor (#12045)
## Problem

Investigate crash of online_advisor in image check

## Summary of changes

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-06 22:23:01 +08:00
Arpad Müller
1e6278f1f3 pgxn: support generations in safekeepers_cmp (#12129)
`safekeepers_cmp` was added by #8840 to make changes of the safekeeper
set order independent: a `sk1,sk2,sk3` specifier changed to
`sk3,sk1,sk2` should not cause a walproposer restart. However, this
check didn't support generations, in the sense that it would see the
`g#123:` as part of the first safekeeper in the list, and if the first
safekeeper changes, it would also restart the walproposer.

Therefore, parse the generation properly and make it not be a part of
the generation.

This PR doesn't add a specific test, but I have confirmed locally that
`test_safekeepers_reconfigure_reorder` is fixed with the changes of PR
#11712 applied thanks to this PR.

Part of https://github.com/neondatabase/neon/issues/11670
2025-06-06 22:23:01 +08:00
Conrad Ludgate
cca40c62b7 compute-ctl: add spec for enable_tls, separate from compute-ctl config (#12109)
## Problem

Inbetween adding the TLS config for compute-ctl, and adding the TLS
config in controlplane, we switched from using a provision flag to a
bind flag. This happened to work in all of my testing in preview regions
as they have no VM pool, so each bind was also a provision. However, in
staging I found that the TLS config is still only processed during
provision, even though it's only sent on bind.

## Summary of changes

* Add a new feature flag value, `tls_experimental`, which tells
postgres/pgbouncer/local_proxy to use the TLS certificates on bind.
* compute_ctl on provision will be told where the certificates are,
instead of being told on bind.
2025-06-06 22:23:00 +08:00
Suhas Thalanki
aa84913318 compute: Add manifest.yml for default Postgres configuration settings (#11820)
Adds a `manifest.yml` file that contains the default settings for
compute. Currently, it comes from cplane code
[here](0cda3d4b01/goapp/controlplane/internal/pkg/compute/computespec/pg_settings.go (L110)).

Related RFC:
https://github.com/neondatabase/neon/blob/main/docs/rfcs/038-independent-compute-release.md

Related Issue: https://github.com/neondatabase/cloud/issues/11698
2025-06-06 22:23:00 +08:00
Tristan Partin
864910a8c5 Use Url::join() when creating the final remote extension URL (#12121)
Url::to_string() adds a trailing slash on the base URL, so when we did
the format!(), we were adding a double forward slash.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-06 22:23:00 +08:00
Erik Grinaker
c51514d427 pageserver: support get_vectored_concurrent_io with gRPC (#12131)
## Problem

The gRPC page service doesn't respect `get_vectored_concurrent_io` and
always uses sequential IO.

## Summary of changes

Spawn a sidecar task for concurrent IO when enabled.

Cancellation will be addressed separately.
2025-06-06 22:23:00 +08:00
a-masterov
18c40ceae9 Fix codestyle for compute.sh for docker-compose (#12128)
## Problem
The script `compute.sh` had a non-consistent coding style and didn't
follow best practices for modern bash scripts
## Summary of changes
The coding style was fixed to follow best practices.
2025-06-06 22:23:00 +08:00
Vlad Lazar
48dd8e2008 pageserver: make import job max byte range size configurable (#12117)
## Problem

We want to repro an OOM situation, but large partial reads are required.

## Summary of Changes

Make the max partial read size configurable for import jobs.
2025-06-06 22:23:00 +08:00
github-actions[bot]
00ba9ca40f Storage release 2025-05-30 17:04 UTC 2025-05-30 17:04:39 +00:00
github-actions[bot]
26923935d3 Storage release 2025-05-23 06:10 UTC 2025-05-23 06:11:00 +00:00
github-actions[bot]
8f98b823c7 Storage release 2025-05-16 06:11 UTC 2025-05-16 06:11:05 +00:00
Erik Grinaker
1821b6ea3f Storage release 2025-05-09 12:25 UTC 2025-05-09 14:25:07 +02:00
Erik Grinaker
be3686e2af Storage release 2025-05-06 15:12 UTC 2025-05-06 17:12:17 +02:00
Erik Grinaker
1c3cb18c60 storcon: fix split aborts removing other tenants (#11837)
## Problem

When aborting a split, the code accidentally removes all other tenant
shards from the in-memory map that have the same shard count as the
aborted split, causing "tenant not found" errors. It will recover on a
storcon restart, when it loads the persisted state. This issue has been
present for at least a year.

Resolves https://github.com/neondatabase/cloud/issues/28589.

## Summary of changes

Only remove shards belonging to the relevant tenant when aborting a
split.

Also adds a regression test.
2025-05-06 17:12:17 +02:00
github-actions[bot]
946a0667eb Storage release 2025-05-02 06:10 UTC 2025-05-02 06:10:54 +00:00
Vlad Lazar
e5d15450ec Storage release 2025-04-28 2025-04-28 22:43:18 +02:00
Alex Chi Z.
d294078d1f fix(pageserver): consider tombstones in replorigin (#11752)
## Problem

We didn't consider tombstones in replorigin read path in the past. This
was fine because tombstones are stored as LSN::Invalid before we
universally define what the tombstone is for sparse keyspaces.

Now we remove non-inherited keys during detach ancestor and write the
universal tombstone "empty image". So we need to consider it across all
the read paths.

related: https://github.com/neondatabase/neon/pull/11299

## Summary of changes

Empty value gets ignored for replorigin scans.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-28 21:20:11 +01:00
github-actions[bot]
eb70d7a55c Storage release 2025-04-25 2025-04-25 11:04:06 +00:00
github-actions[bot]
20723ea039 Storage release 2025-04-21 2025-04-21 19:39:21 +00:00
Vlad Lazar
db95540975 pageserver: handle empty get vectored queries (#11652)
## Problem

If all batched requests are excluded from the query by
`Timeine::get_rel_page_at_lsn_batched` (e.g. because they are past the
end of the relation), the read path would panic since it doesn't expect
empty queries. This is a change in behaviour that was introduced with
the scattered query implementation.

## Summary of Changes

Handle empty queries explicitly.
2025-04-21 15:38:44 -04:00
JC Grünhage
90033fe693 fix(ci): set token for fast-forward failure comments and allow merging with state unstable (#11647)
## Problem

https://github.com/neondatabase/neon/actions/runs/14538136318/job/40790985693?pr=11645
failed, even though the relevant parts of the CI had passed and
auto-merge determined the PR is ready to merge. After that, commenting
failed.

## Summary of changes
- set GH_TOKEN for commenting after fast-forward failure
- allow merging with mergeable_state unstable
2025-04-21 15:38:44 -04:00
JC Grünhage
cb9d439cc1 fix(ci): make regex to find rc branches less strict (#11646)
## Problem

https://github.com/neondatabase/neon/actions/runs/14537161022/job/40787763965
failed to find the correct RC PR run, preventing artifact re-use. This
broke in https://github.com/neondatabase/neon/pull/11547.

There's a hotfix release containing this in
https://github.com/neondatabase/neon/pull/11645.

## Summary of changes
Make the regex for finding the RC PR run less strict, it was needlessly
precise.
2025-04-21 15:38:44 -04:00
Jan Christian Grünhage
7a2f0c4d53 Storage release 2025-04-18 2025-04-18 18:51:56 +02:00
Jan Christian Grünhage
60c907cbdb fix(ci): allow merging with mergeable_state unstable 2025-04-18 18:51:27 +02:00
Jan Christian Grünhage
2eb43a5d81 fix(ci): set GH_TOKEN for commenting after fast-forward failure 2025-04-18 18:51:21 +02:00
Jan Christian Grünhage
b2f45fe37f fix(ci): make regex to find rc branches less strict 2025-04-18 17:15:01 +02:00
github-actions[bot]
3904a0fe4f Storage release 2025-04-18 2025-04-18 06:02:29 +00:00
github-actions[bot]
cd07b120b5 Storage release 2025-04-11 2025-04-11 16:20:12 +00:00
github-actions[bot]
7c06a33df0 Storage release 2025-04-04 2025-04-04 15:40:58 +00:00
github-actions[bot]
85992000e3 Storage release 2025-03-28 2025-03-28 06:02:32 +00:00
github-actions[bot]
c33cf739e3 Storage release 2025-03-16 2025-03-16 17:05:15 +00:00
JC Grünhage
1812fc7cf1 Merge pull request #11251 from neondatabase/rc/release/2025-03-14--storcon-hotfix
Storage controller hotfix 2025-03-14
2025-03-14 16:43:22 +01:00
Christian Schwarz
929d963a35 Merge branch 'hotfix/release/2025-03-14-storcon-optimizations' into rc/release/2025-03-14--storcon-hotfix 2025-03-14 15:06:25 +01:00
Christian Schwarz
e405420458 CHERRY PICK: fix(storcon): optimization validation makes decisions based on wrong SecondaryProgress (#11229)
(cherry picked from commit 04370b48b3)

Conflicts:
	storage_controller/src/service.rs

Because `release` head doesn't yet have
`storcon: timetime table, creation and deletion (#11058)`
2025-03-14 14:47:29 +01:00
Alex Chi Z.
1b5258ef6a Merge pull request #11161 from neondatabase/arpad/release-db-sk-loading
release branch: re-apply "storcon db: load safekeepers from DB again"
2025-03-10 21:50:58 -04:00
Arpad Müller
9fbb33e9d9 storcon db: load safekeepers from DB again (#11087)
Earlier PR #11041 soft-disabled the loading code for safekeepers from
the storcon db. This PR makes us load the safekeepers from the database
again, now that we have [JWTs available on
staging](https://github.com/neondatabase/neon/pull/11087) and soon on
prod.

This reverts commit 23fb8053c5.

Part of https://github.com/neondatabase/cloud/issues/24727
2025-03-11 01:12:21 +01:00
Alex Chi Z.
93983ac5fc Merge pull request #11128 from neondatabase/rc/release/2025-03-07
Storage release 2025-03-07
2025-03-07 14:02:55 -05:00
Alex Chi Z
72dd540c87 Merge branch 'release' of https://github.com/neondatabase/neon into rc/release/2025-03-07 2025-03-07 13:04:10 -05:00
Alex Chi Z.
612d0aea4f feat(pageserver): add force patch index_part API (#11119)
## Problem

As part of the disaster recovery tool. Partly for
https://github.com/neondatabase/neon/issues/9114.

## Summary of changes

* Add a new pageserver API to force patch the fields in index_part and
modify the timeline internal structures.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-07 12:54:44 -05:00
Vlad Lazar
aa3a75a0a7 pageserver: enable previous heatmaps by default (#11132)
We add the off by default configs in
https://github.com/neondatabase/neon/pull/11088 because
the unarchival heatmap was causing oversized secondary locations. That
was fixed in https://github.com/neondatabase/neon/pull/11098, so let's
turn them on by default.
2025-03-07 11:14:38 -05:00
Arpad Müller
6389c9184c update ring to 0.17.13 (#11131)
Update ring from 0.17.6 to 0.17.13. Addresses the advisory:
https://rustsec.org/advisories/RUSTSEC-2025-0009
2025-03-07 11:14:26 -05:00
Vlad Lazar
e177927476 safekeeper: don't skip empty records for shard zero (#11137)
## Problem

Shard zero needs to track the start LSN of the latest record
in adition to the LSN of the next record to ingest. The former
is included in basebackup persisted by the compute in WAL.

Previously, empty records were skipped for all shards. This caused
the prev LSN tracking on the PS to fall behind and led to logical
replication
issues.

## Summary of changes

Shard zero now receives emtpy interpreted records for LSN tracking
purposes.
A test is included too.
2025-03-07 11:04:30 -05:00
github-actions[bot]
e9bbafebbd Storage release 2025-03-07 2025-03-07 06:02:10 +00:00
Vlad Lazar
7430fb9836 Merge pull request #11090 from neondatabase/vlad/release-gate-previous-heatmap
Storage release 2025-03-05
2025-03-05 14:37:24 +00:00
Vlad Lazar
c45d169527 pageserver: gate previous heatmap behind config flag (#11088)
## Problem

On unarchival, we update the previous heatmap with all visible layers.
When the primary generates a new heatmap it includes all those layers,
so the secondary will download them. Since they're not actually resident
on the primary (we didn't call the warm up API), they'll never be
evicted, so they remain in the heatmap.

This leads to oversized secondary locations like we saw in pre-prod.

## Summary of changes

Gate the loading of the previous heatmaps and the heatmap generation on
unarchival behind configuration
flags. They are disabled by default, but enabled in tests.
2025-03-05 13:29:46 +01:00
Vlad Lazar
a1e67cfe86 Merge pull request #11051 from neondatabase/vlad/release-8005-and-cherry-picks
Storage release 2025-02-28
2025-02-28 19:40:33 +00:00
Erik Grinaker
0263c92c47 pageserver: fix race that can wedge background tasks (#11047)
## Problem

`wait_for_active_tenant()`, used when starting background tasks, has a
race condition that can cause it to wait forever (until cancelled). It
first checks the current tenant state, and then subscribes for state
updates, but if the state changes between these then it won't be
notified about it.

We've seen this wedge compaction tasks, which can cause unbounded layer
file buildup and read amplification.

## Summary of changes

Use `watch::Receiver::wait_for()` to check both the current and new
tenant states.
2025-02-28 18:11:10 +01:00
Vlad Lazar
5fc599d653 storcon: soft disable SK heartbeats (#11041)
## Problem

JWT tokens aren't in place, so all SK heartbeats fail. This is
equivalent to a wait before applying the PS heartbeats and makes things
more flaky.

## Summary of Changes

Add a flag that skips loading SKs from the db on start-up and at
runtime.
2025-02-28 18:11:10 +01:00
JC Grünhage
66d2592d04 Merge pull request #11032 from neondatabase/rc/release/2025-02-28
Storage release 2025-02-28
2025-02-28 11:12:07 +01:00
Jan Christian Grünhage
517ae7a60e Merge remote-tracking branch 'origin/release' into rc/release/2025-02-28 2025-02-28 10:10:19 +01:00
github-actions[bot]
b2a769cc86 Storage release 2025-02-28 2025-02-28 06:02:09 +00:00
Erik Grinaker
12f0e525c6 Merge pull request #10961 from neondatabase/erik/release-7930-slow-getpage
pageserver: tweak slow GetPage logging (#10956)
2025-02-24 23:05:10 +01:00
Erik Grinaker
6c6e5bfc2b pageserver: tweak slow GetPage logging 2025-02-24 21:27:38 +01:00
Erik Grinaker
9c0fefee25 Merge pull request #10921 from neondatabase/rc/release/2025-02-21
Storage release 2025-02-21
2025-02-21 18:55:58 +01:00
Vlad Lazar
97e2e27f68 storcon: use Duration for duration's in the storage controller tenant config (#10928)
## Problem

The storage controller treats durations in the tenant config as strings.
These are loaded from the db.
The pageserver maps these durations to a seconds only format and we
always get a mismatch compared
to what's in the db.

## Summary of changes

Treat durations as durations inside the storage controller and not as
strings.
Nothing changes in the cross service API's themselves or the way things
are stored in the db.

I also added some logging which I would have made the investigation a
10min job:
1. Reason for why the reconciliation was spawned
2. Location config diff between the observed and wanted states
2025-02-21 17:14:43 +01:00
Erik Grinaker
cea2b222d0 Merge branch 'release' into rc/release/2025-02-21 2025-02-21 12:38:14 +01:00
github-actions[bot]
5b2afd953c Storage release 2025-02-21 2025-02-21 06:02:10 +00:00
Alex Chi Z.
74e789b155 Merge pull request #10878 from neondatabase/rc/release/2025-02-18 2025-02-18 23:04:21 -05:00
Alex Chi Z.
235439e639 fix(pageserver): make repartition error critical (#10872)
## Problem

Read errors during repartition should be a critical error.

## Summary of changes

<del>We only have one call site</del> We have two call sites of
`repartition` where one of them is during the initial image upload
optimization and another is during image layer creation, so I added a
`critical!` here instead of inside `collect_keyspace`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-18 15:29:19 -05:00
Alex Chi Z.
a78438f15c Revert "feat(pageserver): repartition on L0-L1 boundary (#10548)" (#10870)
This reverts commit 443c8d0b4b.

## Problem

We observe a massive amount of compaction errors.

## Summary of changes

If the tenant did not write any L1 layers (i.e., they accumulate L0
layers where number of them is below L0 threshold), image creation will
always fail. Therefore, it's not correct to simply use the
disk_consistent_lsn or L0/L1 boundary for the image creation.
2025-02-18 13:39:01 -05:00
Arseny Sher
3d370679a1 Merge pull request #10855 from neondatabase/rel-02-14-39d42d846ae38
Cherry-pick #10845 into release
2025-02-17 18:46:22 +03:00
John Spray
c37e3020ab pageserver_api: fix decoding old-version TimelineInfo (#10845)
## Problem

In #10707 some new fields were introduced in TimelineInfo.

I forgot that we do not only use TimelineInfo for encoding, but also
decoding when the storage controller calls into a pageserver, so this
broke some calls from controller to pageserver while in a mixed-version
state.

## Summary of changes

- Make new fields have default behavior so that they are optional
2025-02-17 18:43:14 +03:00
Arseny Sher
a036708da1 Merge pull request #10820 from neondatabase/rc/release/2025-02-14
Storage release 2025-02-14
2025-02-14 19:36:36 +03:00
John Spray
1e7ad80ee7 storage controller: prioritize reconciles for user-facing operations (#10822)
## Problem

Some situations may produce a large number of pending reconciles. If we
experience an issue where reconciles are processed more slowly than
expected, that can prevent us responding promptly to user requests like
tenant/timeline CRUD.

This is a cleaner implementation of the hotfix in
https://github.com/neondatabase/neon/pull/10815

## Summary of changes

- Introduce a second semaphore for high priority tasks, with
configurable units (default 256). The intent is that in practical
situations these user-facing requests should never have to wait.
- Use the high priority semaphore for: tenant/timeline CRUD, and shard
splitting operations. Use normal priority for everything else.
2025-02-14 17:34:51 +03:00
John Spray
581be23100 storcon: fix eliding parameters from proxied URL labels (#10817)
## Problem

We had code for stripping IDs out of proxied paths to reduce cardinality
of metrics, but it was only stripping out tenant IDs, and leaving in
timeline IDs and query parameters (e.g. LSN in lsn->timestamp lookups).

## Summary of changes

- Use a more general regex approach.

There is still some risk that a future pageserver API might include a
parameter in `/the/path/`, but we control that API and it is not often
extended. We will also alert on metrics cardinality in staging so that
if we made that mistake we would notice.
2025-02-14 17:34:25 +03:00
github-actions[bot]
8ca7ea859d Storage release 2025-02-14 2025-02-14 06:02:05 +00:00
Christian Schwarz
efea8223bb Merge pull request #10774 from neondatabase/releases/2025-02-11-smgr-op-latency-metrics-hotfix 2025-02-11 21:16:44 +01:00
Christian Schwarz
d3d3bfc6d0 fix(page_service / batching): smgr op latency metric of dropped responses include flush time (#10756)
# Problem

Say we have a batch of 10 responses to send out.

Then, even with

- #10728

we've still only called observe_execution_end_flush_start for the first
3 responses.

The remaining 7 response timers are still ticking.

When compute now closes the connection, the waiting flush fails with an
error and we `drop()` the remaining 7 responses' smgr op timers. The
`impl Drop for SmgrOpTimer` will observe an execution time that includes
the flush time.

In practice, this is supsected to produce the `+Inf` observations in the
smgr op latency histogram we've seen since the introduction of
pipelining, even after shipping #10728.

refs:
- fixup of https://github.com/neondatabase/neon/pull/10042
- fixup of https://github.com/neondatabase/neon/pull/10728
- fixes https://github.com/neondatabase/neon/issues/10754
2025-02-11 20:25:13 +01:00
Christian Schwarz
b3a911ff8c fix(page_service / batching): smgr op latency metrics includes the flush time of preceding requests (#10728)
Before this PR, if a batch contains N responses, the smgr op latency
reported for response (N-i) would include the time we spent flushing
the preceding requests.

refs:
- fixup of https://github.com/neondatabase/neon/pull/10042
- fixes https://github.com/neondatabase/neon/issues/10674
2025-02-11 20:25:08 +01:00
John Spray
a54853abd5 Merge pull request #10712 from neondatabase/rc/release/2025-02-07
Storage release 2025-02-07
2025-02-07 18:21:13 +00:00
Arpad Müller
69007f7ac8 Revert recent AWS SDK update (#10724)
We've been seeing some regressions in staging since the AWS SDK updates:
https://github.com/neondatabase/neon/issues/10695 . We aren't sure the
regression was caused by the SDK update, but the issues do involve S3,
so it's not unlikely. By reverting the SDK update we find out whether it
was really the SDK update, or something else.

Reverts the two PRs:

* https://github.com/neondatabase/neon/pull/10588
* https://github.com/neondatabase/neon/pull/10699

https://neondb.slack.com/archives/C08C2G15M6U/p1738576986047179
2025-02-07 18:18:45 +00:00
github-actions[bot]
d255fa4b7e Storage release 2025-02-07 2025-02-07 06:02:18 +00:00
Arpad Müller
40d6b3a34e Merge pull request #10602 from neondatabase/rc/release/2025-01-31
Storage release 2025-01-31
2025-02-03 16:43:04 +01:00
github-actions[bot]
a018878e27 Storage release 2025-01-31 2025-01-31 06:02:08 +00:00
Christian Schwarz
e5b3eb1e64 Merge pull request #10500 from neondatabase/rc/release/2025-01-24
Storage release 2025-01-24
2025-01-25 00:54:56 +01:00
github-actions[bot]
f35e1356a1 Storage release 2025-01-24 2025-01-24 06:02:13 +00:00
Christian Schwarz
4dec0dddc6 Merge pull request #10447 from neondatabase/releases/2025-01-20-hotfix
Release: storage hotfix 2025-01-20
2025-01-20 15:55:44 +01:00
Christian Schwarz
e0c504af38 fix(page_service / handle): panic when parallel client disconnect & Timeline shutdown
Refs
- fixes https://github.com/neondatabase/neon/issues/10444
2025-01-20 14:37:16 +01:00
Alex Chi Z.
3399eea2ed Merge pull request #10436 from neondatabase/rc/release/2025-01-17
Storage release 2025-01-17
2025-01-17 12:36:17 -05:00
Alex Chi Z
6a29c809d5 Merge branch 'release' of https://github.com/neondatabase/neon into rc/release/2025-01-17 2025-01-17 10:44:25 -05:00
github-actions[bot]
a62c01df4c Storage release 2025-01-17 2025-01-17 06:02:11 +00:00
Vlad Lazar
4c093c6314 Merge pull request #10338 from neondatabase/rc/release/2025-01-10
Storage release 2025-01-10
2025-01-10 19:21:43 +00:00
github-actions[bot]
32f58f8228 Storage release 2025-01-10 2025-01-10 06:02:00 +00:00
Erik Grinaker
96c36c0894 Merge pull request #10263 from neondatabase/rc/release/2025-01-03
Storage release 2025-01-03
2025-01-03 20:32:37 +01:00
Erik Grinaker
d719709316 Revert "pageserver: revert flush backpressure (#8550) (#10135)" (#10270)
This reverts commit f3ecd5d76a.

It is
[suspected](https://neondb.slack.com/archives/C033RQ5SPDH/p1735907405716759)
to have caused significant read amplification in the [ingest
benchmark](https://neonprod.grafana.net/d/de3mupf4g68e8e/perf-test3a-ingest-benchmark?orgId=1&from=now-30d&to=now&timezone=utc&var-new_project_endpoint_id=ep-solitary-sun-w22bmut6&var-large_tenant_endpoint_id=ep-holy-bread-w203krzs)
(specifically during index creation).

We will revisit an intermediate improvement here to unblock [upload
parallelism](https://github.com/neondatabase/neon/issues/10096) before
properly addressing [compaction
backpressure](https://github.com/neondatabase/neon/issues/8390).
2025-01-03 16:51:16 +01:00
Erik Grinaker
97912f19fc pageserver,safekeeper: disable heap profiling (#10268)
## Problem

Since enabling continuous profiling in staging, we've seen frequent seg
faults. This is suspected to be because jemalloc and pprof-rs take a
stack trace at the same time, and the handlers aren't signal safe.
jemalloc does this probabilistically on every allocation, regardless of
whether someone is taking a heap profile, which means that any CPU
profile has a chance to cause a seg fault.

Touches #10225.

## Summary of changes

For now, just disable heap profiles -- CPU profiles are more important,
and we need to be able to take them without risking a crash.
2025-01-03 16:51:16 +01:00
github-actions[bot]
49724aa3b6 Storage release 2025-01-03 2025-01-03 06:02:03 +00:00
387 changed files with 5156 additions and 16909 deletions

View File

@@ -21,14 +21,13 @@ platforms = [
# "x86_64-apple-darwin",
# "x86_64-pc-windows-msvc",
]
[final-excludes]
workspace-members = [
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
"vm_monitor",
# subzero-core is a stub crate that should be excluded from workspace-hack
"subzero-core",
# All of these exist in libs and are not usually built independently.
# Putting workspace hack there adds a bottleneck for cargo builds.
"compute_api",

View File

@@ -31,7 +31,7 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -1,28 +0,0 @@
name: 'Prepare current job for subzero'
description: >
Set git token to access `neondatabase/subzero` from cargo build,
and set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable to use git CLI
inputs:
token:
description: 'GitHub token with access to neondatabase/subzero'
required: true
runs:
using: "composite"
steps:
- name: Set git token for neondatabase/subzero
uses: pyTooling/Actions/with-post-step@2307b526df64d55e95884e072e49aac2a00a9afa # v5.1.0
env:
SUBZERO_ACCESS_TOKEN: ${{ inputs.token }}
with:
main: |
git config --global url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a
post: |
git config --global --unset url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
- name: Set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable
shell: bash -euxo pipefail {0}
run: echo "CARGO_NET_GIT_FETCH_WITH_CLI=true" >> ${GITHUB_ENV}

View File

@@ -86,10 +86,6 @@ jobs:
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
@@ -120,7 +116,7 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FLAGS="--locked --features testing,rest_broker"
CARGO_FLAGS="--locked --features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_PROFILE=""

View File

@@ -46,10 +46,6 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Cache cargo deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0

View File

@@ -1,384 +0,0 @@
name: TPC-C like benchmark using benchbase
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 6 * * *' # run once a day at 6 AM UTC
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we do not want to be too noisy in production environment
group: benchbase-tpcc-workflow
cancel-in-progress: false
permissions:
contents: read
jobs:
benchbase-tpcc:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 2
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 2
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PG_CONFIG: /tmp/neon/pg_install/v17/bin/pg_config
PSQL: /tmp/neon/pg_install/v17/bin/psql
PG_17_LIB_PATH: /tmp/neon/pg_install/v17/lib
POSTGRES_VERSION: 17
runs-on: [ self-hosted, us-east-2, x64 ]
timeout-minutes: 1440
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project-tpcc
uses: ./.github/actions/neon-project-create
with:
region_id: aws-us-east-2
postgres_version: ${{ env.POSTGRES_VERSION }}
compute_units: '[${{ matrix.min_cu }}, ${{ matrix.max_cu }}]'
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)
- name: Initialize Neon project
env:
BENCHMARK_TPCC_CONNSTR: ${{ steps.create-neon-project-tpcc.outputs.dsn }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
run: |
echo "Initializing Neon project with project_id: ${PROJECT_ID}"
export LD_LIBRARY_PATH=${PG_17_LIB_PATH}
# Retry logic for psql connection with 1 minute sleep between attempts
for attempt in {1..3}; do
echo "Attempt ${attempt}/3: Creating extensions in Neon project"
if ${PSQL} "${BENCHMARK_TPCC_CONNSTR}" -c "CREATE EXTENSION IF NOT EXISTS neon; CREATE EXTENSION IF NOT EXISTS neon_utils;"; then
echo "Successfully created extensions"
break
else
echo "Failed to create extensions on attempt ${attempt}"
if [ ${attempt} -lt 3 ]; then
echo "Waiting 60 seconds before retry..."
sleep 60
else
echo "All attempts failed, exiting"
exit 1
fi
fi
done
echo "BENCHMARK_TPCC_CONNSTR=${BENCHMARK_TPCC_CONNSTR}" >> $GITHUB_ENV
- name: Generate BenchBase workload configuration
env:
WAREHOUSES: ${{ matrix.warehouses }}
MAX_RATE: ${{ matrix.max_rate }}
run: |
echo "Generating BenchBase configs for warehouses: ${WAREHOUSES}, max_rate: ${MAX_RATE}"
# Extract hostname and password from connection string
# Format: postgresql://username:password@hostname/database?params (no port for Neon)
HOSTNAME=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:[^@]*@\([^/]*\)/.*|\1|p')
PASSWORD=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:\([^@]*\)@.*|\1|p')
echo "Extracted hostname: ${HOSTNAME}"
# Use runner temp (NVMe) as working directory
cd "${RUNNER_TEMP}"
# Copy the generator script
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_workload_size.py" .
# Generate configs and scripts
python3 generate_workload_size.py \
--warehouses ${WAREHOUSES} \
--max-rate ${MAX_RATE} \
--hostname ${HOSTNAME} \
--password ${PASSWORD} \
--runner-arch ${{ runner.arch }}
# Fix path mismatch: move generated configs and scripts to expected locations
mv ../configs ./configs
mv ../scripts ./scripts
- name: Prepare database (load data)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Loading ${WAREHOUSES} warehouses into database..."
# Run the loader script and capture output to log file while preserving stdout/stderr
./scripts/load_${WAREHOUSES}_warehouses.sh 2>&1 | tee "load_${WAREHOUSES}_warehouses.log"
echo "Database loading completed"
- name: Run TPC-C benchmark (warmup phase, then benchmark at 70% of configuredmax TPS)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C benchmark with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_opt_rate.sh
echo "Benchmark execution completed"
- name: Run TPC-C benchmark (warmup phase, then ramp down TPS and up again in 5 minute intervals)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C ramp-down-up with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_ramp_up.sh
echo "Benchmark execution completed"
- name: Process results (upload to test results database and generate diagrams)
env:
WAREHOUSES: ${{ matrix.warehouses }}
MIN_CU: ${{ matrix.min_cu }}
MAX_CU: ${{ matrix.max_cu }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
REVISION: ${{ github.sha }}
PERF_DB_CONNSTR: ${{ secrets.PERF_TEST_RESULT_CONNSTR }}
run: |
cd "${RUNNER_TEMP}"
echo "Creating temporary Python environment for results processing..."
# Create temporary virtual environment
python3 -m venv temp_results_env
source temp_results_env/bin/activate
# Install required packages in virtual environment
pip install matplotlib pandas psycopg2-binary
echo "Copying results processing scripts..."
# Copy both processing scripts
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_diagrams.py" .
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/upload_results_to_perf_test_results.py" .
echo "Processing load phase metrics..."
# Find and process load log
LOAD_LOG=$(find . -name "load_${WAREHOUSES}_warehouses.log" -type f | head -1)
if [ -n "$LOAD_LOG" ]; then
echo "Processing load metrics from: $LOAD_LOG"
python upload_results_to_perf_test_results.py \
--load-log "$LOAD_LOG" \
--run-type "load" \
--warehouses "${WAREHOUSES}" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Load log file not found: load_${WAREHOUSES}_warehouses.log"
fi
echo "Processing warmup results for optimal rate..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | head -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | head -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing optimal rate results..."
# Find and process optimal rate results
OPTRATE_CSV=$(find results_opt_rate -name "*.results.csv" -type f | head -1)
OPTRATE_JSON=$(find results_opt_rate -name "*.summary.json" -type f | head -1)
if [ -n "$OPTRATE_CSV" ] && [ -n "$OPTRATE_JSON" ]; then
echo "Generating optimal rate diagram from: $OPTRATE_CSV"
python generate_diagrams.py \
--input-csv "$OPTRATE_CSV" \
--output-svg "benchmark_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "70% of max TPS"
echo "Uploading optimal rate metrics from: $OPTRATE_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$OPTRATE_JSON" \
--results-csv "$OPTRATE_CSV" \
--run-type "opt-rate" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing optimal rate results files (CSV: $OPTRATE_CSV, JSON: $OPTRATE_JSON)"
fi
echo "Processing warmup 2 results for ramp down/up phase..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | tail -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | tail -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_2_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing ramp results..."
# Find and process ramp results
RAMPUP_CSV=$(find results_ramp_up -name "*.results.csv" -type f | head -1)
RAMPUP_JSON=$(find results_ramp_up -name "*.summary.json" -type f | head -1)
if [ -n "$RAMPUP_CSV" ] && [ -n "$RAMPUP_JSON" ]; then
echo "Generating ramp diagram from: $RAMPUP_CSV"
python generate_diagrams.py \
--input-csv "$RAMPUP_CSV" \
--output-svg "ramp_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "ramp TPS down and up in 5 minute intervals"
echo "Uploading ramp metrics from: $RAMPUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$RAMPUP_JSON" \
--results-csv "$RAMPUP_CSV" \
--run-type "ramp-up" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing ramp results files (CSV: $RAMPUP_CSV, JSON: $RAMPUP_JSON)"
fi
# Deactivate and clean up virtual environment
deactivate
rm -rf temp_results_env
rm upload_results_to_perf_test_results.py
echo "Results processing completed and environment cleaned up"
- name: Set date for upload
id: set-date
run: echo "date=$(date +%Y-%m-%d)" >> $GITHUB_OUTPUT
- name: Configure AWS credentials # necessary to upload results
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: us-east-2
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 900 # 900 is minimum value
- name: Upload benchmark results to S3
env:
S3_BUCKET: neon-public-benchmark-results
S3_PREFIX: benchbase-tpc-c/${{ steps.set-date.outputs.date }}/${{ github.run_id }}/${{ matrix.warehouses }}-warehouses
run: |
echo "Redacting passwords from configuration files before upload..."
# Mask all passwords in XML config files
find "${RUNNER_TEMP}/configs" -name "*.xml" -type f -exec sed -i 's|<password>[^<]*</password>|<password>redacted</password>|g' {} \;
echo "Uploading benchmark results to s3://${S3_BUCKET}/${S3_PREFIX}/"
# Upload the entire benchmark directory recursively
aws s3 cp --only-show-errors --recursive "${RUNNER_TEMP}" s3://${S3_BUCKET}/${S3_PREFIX}/
echo "Upload completed"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)

View File

@@ -418,7 +418,7 @@ jobs:
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PROJECT_ID: ${{ vars.PREWARM_PROJECT_ID }}
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output

View File

@@ -146,9 +146,7 @@ jobs:
with:
file: build-tools/Dockerfile
context: .
attests: |
type=provenance,mode=max
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
provenance: false
push: true
pull: true
build-args: |

View File

@@ -54,10 +54,6 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Install build dependencies
run: |

View File

@@ -632,11 +632,7 @@ jobs:
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
DEBIAN_VERSION=bookworm
secrets: |
SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }}
attests: |
type=provenance,mode=max
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
provenance: false
push: true
pull: true
file: Dockerfile
@@ -749,9 +745,7 @@ jobs:
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
DEBIAN_VERSION=${{ matrix.version.debian }}
attests: |
type=provenance,mode=max
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
provenance: false
push: true
pull: true
file: compute/compute-node.Dockerfile
@@ -770,9 +764,7 @@ jobs:
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
DEBIAN_VERSION=${{ matrix.version.debian }}
attests: |
type=provenance,mode=max
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
provenance: false
push: true
pull: true
file: compute/compute-node.Dockerfile

View File

@@ -72,7 +72,6 @@ jobs:
check-macos-build:
needs: [ check-permissions, files-changed ]
uses: ./.github/workflows/build-macos.yml
secrets: inherit
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ fromJSON(needs.files-changed.outputs.rebuild_rust_code) }}

View File

@@ -48,20 +48,8 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image, generate-ch-tmppw ]
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
@@ -72,21 +60,16 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:25.6
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
PGSSLCERT: /tmp/postgresql.crt
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -96,7 +79,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -142,7 +125,6 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

View File

@@ -3,7 +3,7 @@ name: Periodic proxy performance test on unit-perf-aws-arm runners
on:
push: # TODO: remove after testing
branches:
- test-proxy-bench # Runs on pushes to test-proxy-bench branch
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
# schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
@@ -32,7 +32,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
runs-on: [self-hosted, unit-perf-aws-arm]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
@@ -55,58 +55,30 @@ jobs:
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "NEON_PROXY_PATH=${RUNNER_TEMP}/neon/bin/proxy"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo ""
} >> "$GITHUB_ENV"
- name: Cache poetry deps
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: show ulimits
shell: bash -euxo pipefail {0}
run: |
ulimit -a
- name: Run proxy-bench
working-directory: ${{ env.PROXY_BENCH_PATH }}
run: ./run.sh --with-grafana --bare-metal
run: ${PROXY_BENCH_PATH}/run.sh
- name: Ingest Bench Results
- name: Ingest Bench Results # neon repo script
if: always()
working-directory: ${{ env.NEON_DIR }}
run: |
mkdir -p $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
shell: bash -euxo pipefail {0}
if: always()
env:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
REPORT_FROM: $TEST_OUTPUT
working-directory: ${{ env.NEON_DIR }}
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
- name: Docker cleanup
if: always()
run: docker compose down
- name: Notify Failure
if: failure()
run: echo "Proxy bench job failed" && exit 1
- name: Cleanup Test Resources
if: always()
shell: bash -euxo pipefail {0}
run: |
# Cleanup the test resources
if [[ -d "${TEST_OUTPUT}" ]]; then
rm -rf ${TEST_OUTPUT}
fi
if [[ -d "${PROXY_BENCH_PATH}/test_output" ]]; then
rm -rf ${PROXY_BENCH_PATH}/test_output
fi
run: echo "Proxy bench job failed" && exit 1

5
.gitignore vendored
View File

@@ -26,14 +26,9 @@ docker-compose/docker-compose-parallel.yml
*.o
*.so
*.Po
*.pid
# pgindent typedef lists
*.list
# Node
**/node_modules/
# various files for local testing
/proxy/.subzero
local_proxy.json

598
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -46,7 +46,6 @@ members = [
"libs/proxy/json",
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/subzero_core",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"pgxn/neon/communicator",
@@ -135,7 +134,7 @@ lock_api = "0.4.13"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
moka = { version = "0.12", features = ["sync"] }
memoffset = "0.9"
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
@@ -143,10 +142,10 @@ notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-blocking-client"] }
opentelemetry-semantic-conventions = "0.30"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
@@ -158,13 +157,11 @@ procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.9"
# Remove after p256 is updated to 0.14.
rand_core = "=0.6"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
routerify = "3"
@@ -214,15 +211,17 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", "
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
tower-otel = { version = "0.6", features = ["axum"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.31"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
tracing-appender = "0.2.3"
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
@@ -233,10 +232,9 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.8"
whoami = "1.5.1"
zerocopy = { version = "0.8", features = ["derive", "simd"] }
json-structural-diff = { version = "0.2.0" }
x509-cert = { version = "0.2.5" }
zerocopy = { version = "0.8", features = ["derive", "simd"] }
zeroize = "1.8"
## TODO replace this with tracing
env_logger = "0.11"

View File

@@ -63,14 +63,7 @@ WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero" && \
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a; \
fi \
&& cargo chef prepare --recipe-path recipe.json
RUN cargo chef prepare --recipe-path recipe.json
# Main build image
FROM $REPOSITORY/$IMAGE:$TAG AS build
@@ -78,33 +71,20 @@ WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG ADDITIONAL_RUSTFLAGS=""
ENV CARGO_FEATURES="default"
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
# `recipe.json`, so the layer can be reused as long as none of the dependencies change.
COPY --from=plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"; \
fi \
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
# Perform the main build. We reuse the Postgres build artifacts from the intermediate 'pg-build'
# layer, and the cargo dependencies built in the previous step.
COPY --chown=nonroot --from=pg-build /home/nonroot/pg_install/ pg_install
COPY --chown=nonroot . .
COPY --chown=nonroot --from=plan /home/nonroot/proxy/Cargo.toml proxy/Cargo.toml
COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Cargo.lock
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_FEATURES="rest_broker"; \
fi \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo auditable build \
--features $CARGO_FEATURES \
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -39,13 +39,13 @@ COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt-get update && \
apt-get install -y --no-install-recommends \
apt update && \
apt install -y --no-install-recommends \
ca-certificates wget gpg && \
wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/postgresql-keyring.gpg] http://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
apt-get update && \
apt-get install -y --no-install-recommends \
apt install -y --no-install-recommends \
build-essential \
autotools-dev \
libedit-dev \
@@ -89,7 +89,8 @@ RUN useradd -ms /bin/bash nonroot -b /home
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
RUN mkdir -p /pgcopydb/{bin,lib} && \
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
chmod -R 755 /pgcopydb && \
chown -R nonroot:nonroot /pgcopydb
@@ -105,8 +106,8 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -e \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
&& apt update \
&& apt install -y \
autoconf \
automake \
bison \
@@ -182,22 +183,22 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
ENV LLVM_VERSION=20
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& apt update \
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt-get install -y --no-install-recommends nodejs \
&& apt install -y nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends docker-ce docker-ce-cli \
&& apt update \
&& apt install -y docker-ce docker-ce-cli \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Configure sudo & docker
@@ -214,11 +215,12 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.37.1
RUN set -e \
&& git clone -b "${MOLD_VERSION}" --depth 1 https://github.com/rui314/mold.git \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
&& cd mold/build \
&& cd mold/build \
&& git checkout ${MOLD_VERSION} \
&& cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++ .. \
&& cmake --build . -j "$(nproc)" \
&& cmake --build . -j $(nproc) \
&& cmake --install . \
&& cd .. \
&& rm -rf mold
@@ -252,7 +254,7 @@ ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
# Download and build static ICU
RUN wget -O "/tmp/libicu-${ICU_VERSION}.tgz" https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
mkdir /tmp/icu && \
pushd /tmp/icu && \
@@ -263,7 +265,8 @@ RUN wget -O "/tmp/libicu-${ICU_VERSION}.tgz" https://github.com/unicode-org/icu/
make install && \
popd && \
rm -rf icu && \
rm -f /tmp/libicu-${ICU_VERSION}.tgz
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
popd
# Switch to nonroot user
USER nonroot:nonroot
@@ -276,19 +279,19 @@ ENV PYTHON_VERSION=3.11.12 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
&& cd "$HOME" \
&& cd $HOME \
&& curl -sSO https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer \
&& chmod +x pyenv-installer \
&& ./pyenv-installer \
&& export PYENV_ROOT=/home/nonroot/.pyenv \
&& export PATH="$PYENV_ROOT/bin:$PATH" \
&& export PATH="$PYENV_ROOT/shims:$PATH" \
&& pyenv install "${PYTHON_VERSION}" \
&& pyenv global "${PYTHON_VERSION}" \
&& pyenv install ${PYTHON_VERSION} \
&& pyenv global ${PYTHON_VERSION} \
&& python --version \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --upgrade pip \
&& pip --version \
&& pip install --no-cache-dir pipenv wheel poetry
&& pip install pipenv wheel poetry
# Switch to nonroot user (again)
USER nonroot:nonroot
@@ -299,7 +302,6 @@ WORKDIR /home/nonroot
ENV RUSTC_VERSION=1.88.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG CARGO_AUDITABLE_VERSION=0.7.0
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
@@ -315,16 +317,14 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" && \
cargo auditable install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" --force && \
cargo auditable install rustfilt --version "${RUSTFILT_VERSION}" && \
cargo auditable install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \
cargo auditable install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \
cargo auditable install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \
cargo auditable install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \
cargo auditable install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \
cargo auditable install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \
--features postgres-bundled --no-default-features && \
cargo install rustfilt --locked --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --locked --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --locked --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --locked --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --locked --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

View File

@@ -6,7 +6,7 @@
"": {
"name": "build-tools",
"devDependencies": {
"@redocly/cli": "1.34.5",
"@redocly/cli": "1.34.4",
"@sourcemeta/jsonschema": "10.0.0"
}
},
@@ -472,9 +472,9 @@
}
},
"node_modules/@redocly/cli": {
"version": "1.34.5",
"resolved": "https://registry.npmjs.org/@redocly/cli/-/cli-1.34.5.tgz",
"integrity": "sha512-5IEwxs7SGP5KEXjBKLU8Ffdz9by/KqNSeBk6YUVQaGxMXK//uYlTJIPntgUXbo1KAGG2d2q2XF8y4iFz6qNeiw==",
"version": "1.34.4",
"resolved": "https://registry.npmjs.org/@redocly/cli/-/cli-1.34.4.tgz",
"integrity": "sha512-seH/GgrjSB1EeOsgJ/4Ct6Jk2N7sh12POn/7G8UQFARMyUMJpe1oHtBwT2ndfp4EFCpgBAbZ/82Iw6dwczNxEA==",
"dev": true,
"license": "MIT",
"dependencies": {
@@ -484,14 +484,14 @@
"@opentelemetry/sdk-trace-node": "1.26.0",
"@opentelemetry/semantic-conventions": "1.27.0",
"@redocly/config": "^0.22.0",
"@redocly/openapi-core": "1.34.5",
"@redocly/respect-core": "1.34.5",
"@redocly/openapi-core": "1.34.4",
"@redocly/respect-core": "1.34.4",
"abort-controller": "^3.0.0",
"chokidar": "^3.5.1",
"colorette": "^1.2.0",
"core-js": "^3.32.1",
"dotenv": "16.4.7",
"form-data": "^4.0.4",
"form-data": "^4.0.0",
"get-port-please": "^3.0.1",
"glob": "^7.1.6",
"handlebars": "^4.7.6",
@@ -522,9 +522,9 @@
"license": "MIT"
},
"node_modules/@redocly/openapi-core": {
"version": "1.34.5",
"resolved": "https://registry.npmjs.org/@redocly/openapi-core/-/openapi-core-1.34.5.tgz",
"integrity": "sha512-0EbE8LRbkogtcCXU7liAyC00n9uNG9hJ+eMyHFdUsy9lB/WGqnEBgwjA9q2cyzAVcdTkQqTBBU1XePNnN3OijA==",
"version": "1.34.4",
"resolved": "https://registry.npmjs.org/@redocly/openapi-core/-/openapi-core-1.34.4.tgz",
"integrity": "sha512-hf53xEgpXIgWl3b275PgZU3OTpYh1RoD2LHdIfQ1JzBNTWsiNKczTEsI/4Tmh2N1oq9YcphhSMyk3lDh85oDjg==",
"dev": true,
"license": "MIT",
"dependencies": {
@@ -544,21 +544,21 @@
}
},
"node_modules/@redocly/respect-core": {
"version": "1.34.5",
"resolved": "https://registry.npmjs.org/@redocly/respect-core/-/respect-core-1.34.5.tgz",
"integrity": "sha512-GheC/g/QFztPe9UA9LamooSplQuy9pe0Yr8XGTqkz0ahivLDl7svoy/LSQNn1QH3XGtLKwFYMfTwFR2TAYyh5Q==",
"version": "1.34.4",
"resolved": "https://registry.npmjs.org/@redocly/respect-core/-/respect-core-1.34.4.tgz",
"integrity": "sha512-MitKyKyQpsizA4qCVv+MjXL4WltfhFQAoiKiAzrVR1Kusro3VhYb6yJuzoXjiJhR0ukLP5QOP19Vcs7qmj9dZg==",
"dev": true,
"license": "MIT",
"dependencies": {
"@faker-js/faker": "^7.6.0",
"@redocly/ajv": "8.11.2",
"@redocly/openapi-core": "1.34.5",
"@redocly/openapi-core": "1.34.4",
"better-ajv-errors": "^1.2.0",
"colorette": "^2.0.20",
"concat-stream": "^2.0.0",
"cookie": "^0.7.2",
"dotenv": "16.4.7",
"form-data": "^4.0.4",
"form-data": "4.0.0",
"jest-diff": "^29.3.1",
"jest-matcher-utils": "^29.3.1",
"js-yaml": "4.1.0",
@@ -582,6 +582,21 @@
"dev": true,
"license": "MIT"
},
"node_modules/@redocly/respect-core/node_modules/form-data": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
"integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==",
"dev": true,
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"mime-types": "^2.1.12"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/@sinclair/typebox": {
"version": "0.27.8",
"resolved": "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.27.8.tgz",
@@ -1330,9 +1345,9 @@
"license": "MIT"
},
"node_modules/form-data": {
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.4.tgz",
"integrity": "sha512-KrGhL9Q4zjj0kiUt5OO4Mr/A/jlI2jDYs5eHBpYHPcBEVSiipAvn2Ko2HnPe20rmcuuvMHNdZFp+4IlGTMF0Ow==",
"version": "4.0.3",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.3.tgz",
"integrity": "sha512-qsITQPfmvMOSAdeyZ+12I1c+CKSstAFAwu+97zrnWAbIr5u8wfsExUzCesVLC8NgHuRUqNN4Zy6UPWUTRGslcA==",
"dev": true,
"license": "MIT",
"dependencies": {

View File

@@ -2,7 +2,7 @@
"name": "build-tools",
"private": true,
"devDependencies": {
"@redocly/cli": "1.34.5",
"@redocly/cli": "1.34.4",
"@sourcemeta/jsonschema": "10.0.0"
}
}

View File

@@ -133,7 +133,7 @@ RUN case $DEBIAN_VERSION in \
# Install newer version (3.25) from backports.
# libstdc++-10-dev is required for plv8
bullseye) \
echo "deb http://archive.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports libstdc++-10-dev"; \
;; \
# Version-specific installs for Bookworm (PG17):

View File

@@ -1 +1 @@
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;

View File

@@ -1 +1 @@
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_req FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_timed FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
END AS lsn;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -1,9 +1,9 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

View File

@@ -1 +1 @@
SELECT pg_catalog.current_setting('max_connections') AS max_connections;
SELECT current_setting('max_connections') as max_connections;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
pg_catalog.age(datfrozenxid) frozen_xid_age
FROM pg_catalog.pg_database
age(datfrozenxid) frozen_xid_age
FROM pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
pg_catalog.mxid_age(datminmxid) min_mxid_age
FROM pg_catalog.pg_database
mxid_age(datminmxid) min_mxid_age
FROM pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
ELSE 0
END AS lsn;

View File

@@ -1 +1 @@
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;

View File

@@ -1 +1 @@
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;

View File

@@ -1,5 +1,5 @@
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
FROM pg_catalog.pg_database
SELECT sum(pg_database_size(datname)) AS total
FROM pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2;

View File

@@ -3,6 +3,6 @@
-- minutes.
SELECT
x::pg_catalog.text AS duration_seconds,
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) AS size
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);

View File

@@ -3,6 +3,6 @@
SELECT
x AS duration,
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
VALUES ('5m'), ('15m'), ('1h')
) AS t (x);

View File

@@ -1 +1 @@
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;

View File

@@ -1,3 +1,3 @@
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
FROM pg_catalog.pg_replication_slots
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';

View File

@@ -1 +1 @@
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';

View File

@@ -1,13 +1,13 @@
-- We export stats for 10 non-system databases. Without this limit it is too
-- easy to abuse the system by creating lots of databases.
SELECT pg_catalog.pg_database_size(datname) AS db_size,
SELECT pg_database_size(datname) AS db_size,
deadlocks,
tup_inserted AS inserted,
tup_updated AS updated,
tup_deleted AS deleted,
datname
FROM pg_catalog.pg_stat_database
FROM pg_stat_database
WHERE datname IN (
SELECT datname FROM pg_database
-- Ignore invalid databases, as we will likely have problems with

View File

@@ -3,4 +3,4 @@
-- replay LSN may have advanced past the receive LSN we are using for the
-- calculation.
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;

View File

@@ -1,5 +1,5 @@
SELECT
CASE
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
END AS replication_delay_seconds;

View File

@@ -1,10 +1,10 @@
SELECT
slot_name,
pg_catalog.pg_wal_lsn_diff(
pg_wal_lsn_diff(
CASE
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
ELSE pg_catalog.pg_current_wal_lsn()
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
END,
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
FROM pg_catalog.pg_replication_slots
restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
WHERE active = false;

View File

@@ -4,4 +4,4 @@ SELECT
WHEN wal_status = 'lost' THEN 1
ELSE 0
END AS wal_is_lost
FROM pg_catalog.pg_replication_slots;
FROM pg_replication_slots;

View File

@@ -1,11 +1,5 @@
commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb
Author: Alexey Masterov <alexey.masterov@databricks.com>
Date: Mon Jul 28 18:11:02 2025 +0200
Patch for pg_repack
diff --git a/regress/Makefile b/regress/Makefile
index bf6edcb..110e734 100644
index bf6edcb..89b4c7f 100644
--- a/regress/Makefile
+++ b/regress/Makefile
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
@@ -13,36 +7,18 @@ index bf6edcb..110e734 100644
#
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
+REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out
new file mode 100644
index 0000000..e7f2363
--- /dev/null
+++ b/regress/expected/autovacuum.out
@@ -0,0 +1,7 @@
+ALTER SYSTEM SET autovacuum='on';
+SELECT pg_reload_conf();
+ pg_reload_conf
+----------------
+ t
+(1 row)
+
diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out
new file mode 100644
index 0000000..fc7978e
--- /dev/null
+++ b/regress/expected/noautovacuum.out
@@ -0,0 +1,7 @@
+ALTER SYSTEM SET autovacuum='off';
+SELECT pg_reload_conf();
+ pg_reload_conf
+----------------
+ t
+(1 row)
+
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -74,22 +50,14 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql
new file mode 100644
index 0000000..a8eda63
--- /dev/null
+++ b/regress/sql/autovacuum.sql
@@ -0,0 +1,2 @@
+ALTER SYSTEM SET autovacuum='on';
+SELECT pg_reload_conf();
diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql
new file mode 100644
index 0000000..13d4836
--- /dev/null
+++ b/regress/sql/noautovacuum.sql
@@ -0,0 +1,2 @@
+ALTER SYSTEM SET autovacuum='off';
+SELECT pg_reload_conf();
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -26,13 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
# Turn off database collector (`--no-collector.database`), we don't use `pg_database_size_bytes` metric anyway, see
# https://github.com/neondatabase/flux-fleet/blob/5e19b3fd897667b70d9a7ad4aa06df0ca22b49ff/apps/base/compute-metrics/scrape-compute-pg-exporter-neon.yaml#L29
# but it's enabled by default and it doesn't filter out invalid databases, see
# https://github.com/prometheus-community/postgres_exporter/blob/06a553c8166512c9d9c5ccf257b0f9bba8751dbc/collector/pg_database.go#L67
# so if it hits one, it starts spamming logs
# ERROR: [NEON_SMGR] [reqid d9700000018] could not read db size of db 705302 from page server at lsn 5/A2457EB0
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --no-collector.database --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn

View File

@@ -26,13 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
# Turn off database collector (`--no-collector.database`), we don't use `pg_database_size_bytes` metric anyway, see
# https://github.com/neondatabase/flux-fleet/blob/5e19b3fd897667b70d9a7ad4aa06df0ca22b49ff/apps/base/compute-metrics/scrape-compute-pg-exporter-neon.yaml#L29
# but it's enabled by default and it doesn't filter out invalid databases, see
# https://github.com/prometheus-community/postgres_exporter/blob/06a553c8166512c9d9c5ccf257b0f9bba8751dbc/collector/pg_database.go#L67
# so if it hits one, it starts spamming logs
# ERROR: [NEON_SMGR] [reqid d9700000018] could not read db size of db 705302 from page server at lsn 5/A2457EB0
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --no-collector.database --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn

View File

@@ -27,10 +27,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hostname-validator = "1.1"
hyper.workspace = true
hyper-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
@@ -47,7 +44,6 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
scopeguard.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
@@ -62,7 +58,6 @@ tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-appender.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true

View File

@@ -52,14 +52,8 @@ stateDiagram-v2
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
Running --> ConfigurationPending : Received a /configure request with spec
Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure
RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration
RefreshConfiguration --> Running : Compute has been re-configured
RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```

View File

@@ -49,10 +49,9 @@ use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
use compute_tools::pg_isready::get_pg_isready_bin;
use compute_tools::spec::*;
use compute_tools::{hadron_metrics, installed_extensions, logger::*};
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
@@ -82,15 +81,6 @@ struct Cli {
#[arg(long, default_value_t = 3081)]
pub internal_http_port: u16,
/// Backwards-compatible --http-port for Hadron deployments. Functionally the
/// same as --external-http-port.
#[arg(
long,
conflicts_with = "external_http_port",
conflicts_with = "internal_http_port"
)]
pub http_port: Option<u16>,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
@@ -148,12 +138,6 @@ struct Cli {
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
#[arg(long)]
pub pg_init_timeout: Option<u64>,
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
pub lakebase_mode: bool,
}
impl Cli {
@@ -190,26 +174,6 @@ impl Cli {
}
}
// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port`
// arg is used and acts the same as `--external-http-port`. The internal http port is defined
// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so
// we need to be careful with the ports to choose.
fn get_external_http_port(cli: &Cli) -> u16 {
if cli.lakebase_mode {
return cli.http_port.unwrap_or(cli.external_http_port);
}
cli.external_http_port
}
fn get_internal_http_port(cli: &Cli) -> u16 {
if cli.lakebase_mode {
return cli
.http_port
.map(|p| p + 1)
.unwrap_or(cli.internal_http_port);
}
cli.internal_http_port
}
fn main() -> Result<()> {
let cli = Cli::parse();
@@ -224,28 +188,15 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let mut log_dir = None;
if cli.lakebase_mode {
log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
}
let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
runtime.block_on(init(cli.dev))?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
if cli.lakebase_mode {
installed_extensions::initialize_metrics();
hadron_metrics::initialize_metrics();
}
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let config = get_config(&cli)?;
let external_http_port = get_external_http_port(&cli);
let internal_http_port = get_internal_http_port(&cli);
let compute_node = ComputeNode::new(
ComputeNodeParams {
compute_id: cli.compute_id,
@@ -254,8 +205,8 @@ fn main() -> Result<()> {
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port,
internal_http_port,
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
remote_ext_base_url: cli.remote_ext_base_url.clone(),
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
@@ -268,32 +219,19 @@ fn main() -> Result<()> {
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
instance_id: std::env::var("INSTANCE_ID").ok(),
lakebase_mode: cli.lakebase_mode,
build_tag: BUILD_TAG.to_string(),
control_plane_uri: cli.control_plane_uri,
config_path_test_only: cli.config,
},
config,
)?;
let exit_code = compute_node.run().context("running compute node")?;
let exit_code = compute_node.run()?;
scenario.teardown();
deinit_and_exit(tracing_provider, exit_code);
deinit_and_exit(exit_code);
}
fn init(
dev_mode: bool,
log_dir: Option<String>,
) -> Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
async fn init(dev_mode: bool) -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -304,7 +242,7 @@ fn init(
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok((provider, file_logs_guard))
Ok(())
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
@@ -329,27 +267,25 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
}
}
fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
if let Some(p) = tracing_provider {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
_ = p.shutdown();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
info!("shutting down");

View File

@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
});
let query = "
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = pg_catalog.now();";
SET updated_at = now();";
match client.simple_query(query).await {
Result::Ok(result) => {

View File

@@ -1,98 +0,0 @@
//! Client for making request to a running Postgres server's communicator control socket.
//!
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
use std::path::Path;
use anyhow::Context;
use hyper::client::conn::http1::SendRequest;
use hyper_util::rt::TokioIo;
/// Name of the socket within the Postgres data directory. This better match that in
/// `pgxn/neon/communicator/src/lib.rs`.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
/// Open a connection to the communicator's control socket, prepare to send requests to it
/// with hyper.
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
let socket_path_len = socket_path.display().to_string().len();
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
// Unix Domain socket. The limit is on the connect(2) function used to open the
// socket, not on the absolute path itself. Postgres changes the current directory to
// the data directory and uses a relative path to bind to the socket, and the relative
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
// open the socket, we need to use a full path, which can be arbitrarily long.
//
// There are a few ways we could work around this:
//
// 1. Change the current directory to the Postgres data directory and use a relative
// path in the connect(2) call. That's problematic because the current directory
// applies to the whole process. We could change the current directory early in
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
// it would be more robust if the data directory is moved around or unlinked for
// some reason, and you would be less likely to accidentally litter other parts of
// the filesystem with e.g. temporary files. However, that's a pretty invasive
// change.
//
// 2. On Linux, you could open() the data directory, and refer to the the socket
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
// Linux-only.
//
// 3. Create a symbolic link to the socket with a shorter path, and use that.
//
// We use the symbolic link approach here. Hopefully the paths we use in production
// are shorter, so that we can open the socket directly, so that this hack is needed
// only in development.
let connect_result = if socket_path_len < 100 {
// We can open the path directly with no hacks.
tokio::net::UnixStream::connect(socket_path).await
} else {
// The path to the socket is too long. Create a symlink to it with a shorter path.
let short_path = std::env::temp_dir().join(format!(
"compute_ctl.short-socket.{}.{}",
std::process::id(),
tokio::task::id()
));
std::os::unix::fs::symlink(&socket_path, &short_path)?;
// Delete the symlink as soon as we have connected to it. There's a small chance
// of leaking if the process dies before we remove it, so try to keep that window
// as small as possible.
scopeguard::defer! {
if let Err(err) = std::fs::remove_file(&short_path) {
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
short_path.display(), err);
}
}
tracing::info!(
"created symlink \"{}\" for socket \"{}\", opening it now",
short_path.display(),
socket_path.display()
);
tokio::net::UnixStream::connect(&short_path).await
};
let stream = connect_result.context("connecting to communicator control socket")?;
let io = TokioIo::new(stream);
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(err) = connection.await {
eprintln!("Error in connection: {err}");
}
});
Ok(request_sender)
}

View File

@@ -6,8 +6,7 @@ use compute_api::responses::{
LfcPrewarmState, PromoteState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, GenericOption,
PageserverConnectionInfo, PageserverProtocol, PgIdent, Role,
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
@@ -22,7 +21,6 @@ use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::ffi::OsString;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -32,23 +30,18 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use utils::pid_file;
use utils::shard::{ShardIndex, ShardNumber, ShardStripeSize};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
use crate::hadron_metrics::COMPUTE_ATTACHED;
use crate::installed_extensions::get_installed_extensions;
use crate::logger::{self, startup_context_from_env};
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
@@ -120,17 +113,6 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
/// Hadron instance ID of the compute node.
pub instance_id: Option<String>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
pub lakebase_mode: bool,
pub build_tag: String,
pub control_plane_uri: Option<String>,
pub config_path_test_only: Option<OsString>,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -172,7 +154,6 @@ pub struct RemoteExtensionMetrics {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub pg_start_time: Option<DateTime<Utc>>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
@@ -196,7 +177,6 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_prewarm_token: CancellationToken,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
@@ -211,7 +191,6 @@ impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
pg_start_time: None,
status: ComputeStatus::Empty,
last_active: None,
error: None,
@@ -222,7 +201,6 @@ impl ComputeState {
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
lfc_prewarm_token: CancellationToken::new(),
}
}
@@ -255,7 +233,7 @@ pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_conninfo: PageserverConnectionInfo,
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
/// k8s dns name and port
@@ -303,47 +281,25 @@ impl ParsedSpec {
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = anyhow::Error;
fn try_from(spec: ComputeSpec) -> Result<Self, anyhow::Error> {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
// Extract the options from the spec file that are needed to connect to
// the storage system.
//
// In compute specs generated by old control plane versions, the spec file might
// be missing the `pageserver_connection_info` field. In that case, we need to dig
// the pageserver connection info from the `pageserver_connstr` field instead, or
// if that's missing too, from the GUC in the cluster.settings field.
let mut pageserver_conninfo = spec.pageserver_connection_info.clone();
if pageserver_conninfo.is_none() {
if let Some(pageserver_connstr_field) = &spec.pageserver_connstring {
pageserver_conninfo = Some(PageserverConnectionInfo::from_connstr(
pageserver_connstr_field,
spec.shard_stripe_size,
)?);
}
}
if pageserver_conninfo.is_none() {
if let Some(guc) = spec.cluster.settings.find("neon.pageserver_connstring") {
let stripe_size = if let Some(guc) = spec.cluster.settings.find("neon.stripe_size")
{
Some(ShardStripeSize(u32::from_str(&guc)?))
} else {
None
};
pageserver_conninfo =
Some(PageserverConnectionInfo::from_connstr(&guc, stripe_size)?);
}
}
let pageserver_conninfo = pageserver_conninfo.ok_or(anyhow::anyhow!(
"pageserver connection information should be provided"
))?;
// Similarly for safekeeper connection strings
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.ok_or(anyhow::anyhow!("safekeeper connstrings should be provided"))?
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())
.collect()
@@ -358,22 +314,22 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
} else {
let guc = spec
.cluster
spec.cluster
.settings
.find("neon.tenant_id")
.ok_or(anyhow::anyhow!("tenant id should be provided"))?;
TenantId::from_str(&guc).context("invalid tenant id")?
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?
};
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
timeline_id
} else {
let guc = spec
.cluster
spec.cluster
.settings
.find("neon.timeline_id")
.ok_or(anyhow::anyhow!("timeline id should be provided"))?;
TimelineId::from_str(&guc).context(anyhow::anyhow!("invalid timeline id"))?
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<String> = spec
@@ -387,7 +343,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
let res = ParsedSpec {
spec,
pageserver_conninfo,
pageserver_connstr,
safekeeper_connstrings,
storage_auth_token,
tenant_id,
@@ -397,7 +353,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
};
// Now check validity of the parsed specification
res.validate().map_err(anyhow::Error::msg)?;
res.validate()?;
Ok(res)
}
}
@@ -442,130 +398,6 @@ struct StartVmMonitorResult {
vm_monitor: Option<JoinHandle<Result<()>>>,
}
// BEGIN_HADRON
/// This function creates roles that are used by Databricks.
/// These roles are not needs to be botostrapped at PG Compute provisioning time.
/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository.
pub(crate) fn create_databricks_roles() -> Vec<String> {
let roles = vec![
// Role for prometheus_stats_exporter
Role {
name: "databricks_monitor".to_string(),
// This uses "local" connection and auth method for that is "trust", so no password is needed.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "IN ROLE pg_monitor".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore control plane
Role {
name: "databricks_control_plane".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "SUPERUSER".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore httpgateway.
Role {
name: "databricks_gateway".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: None,
},
];
roles
.into_iter()
.map(|role| {
let query = format!(
r#"
DO $$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}')
THEN
CREATE ROLE {} {};
END IF;
END
$$;"#,
role.name,
role.name.pg_quote(),
role.to_pg_options(),
);
query
})
.collect()
}
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
pub struct DatabricksEnvVars {
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
/// the token scopes of internal auth tokens.
pub endpoint_id: String,
/// Hostname of the Databricks workspace URL this compute instance belongs to.
/// Used by postgres to verify Databricks PAT tokens.
pub workspace_host: String,
pub lakebase_mode: bool,
}
impl DatabricksEnvVars {
pub fn new(
compute_spec: &ComputeSpec,
compute_id: Option<&String>,
instance_id: Option<String>,
lakebase_mode: bool,
) -> Self {
let endpoint_id = if let Some(instance_id) = instance_id {
// Use instance_id as endpoint_id if it is set. This code path is for PuPr model.
instance_id
} else {
// Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model.
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
// Panics if compute_id is not set or not in the expected format.
compute_id.unwrap().split('/').next().unwrap().to_string()
};
let workspace_host = compute_spec
.databricks_settings
.as_ref()
.map(|s| s.databricks_workspace_host.clone())
.unwrap_or("".to_string());
Self {
endpoint_id,
workspace_host,
lakebase_mode,
}
}
/// Constants for the names of Databricks-specific postgres environment variables.
const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID";
const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST";
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
pub fn to_env_var_list(self) -> Vec<(String, String)> {
if !self.lakebase_mode {
// In neon env, we don't need to pass down the env vars to postgres.
return vec![];
}
vec![
(
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
self.endpoint_id.clone(),
),
(
Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(),
self.workspace_host.clone(),
),
]
}
}
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
@@ -589,7 +421,7 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0 -c pgaudit.log=none";
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute
@@ -602,11 +434,7 @@ impl ComputeNode {
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
if params.lakebase_mode {
ComputeNode::set_spec(&params, &mut new_state, pspec);
} else {
new_state.pspec = Some(pspec);
}
new_state.pspec = Some(pspec);
}
Ok(ComputeNode {
@@ -651,7 +479,6 @@ impl ComputeNode {
port: this.params.external_http_port,
config: this.compute_ctl_config.clone(),
compute_id: this.params.compute_id.clone(),
instance_id: this.params.instance_id.clone(),
}
.launch(&this);
@@ -821,9 +648,6 @@ impl ComputeNode {
};
_this_entered = start_compute_span.enter();
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
state_guard.pg_start_time.replace(Utc::now());
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
}
@@ -1204,14 +1028,7 @@ impl ComputeNode {
// If it is something different then create_dir() will error out anyway.
let pgdata = &self.params.pgdata;
let _ok = fs::remove_dir_all(pgdata);
if self.params.lakebase_mode {
// Ignore creation errors if the directory already exists (e.g. mounting it ahead of time).
// If it is something different then PG startup will error out anyway.
let _ok = fs::create_dir(pgdata);
} else {
fs::create_dir(pgdata)?;
}
fs::create_dir(pgdata)?;
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
Ok(())
@@ -1223,10 +1040,12 @@ impl ComputeNode {
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let started = Instant::now();
let (connected, size) = match spec.pageserver_conninfo.prefer_protocol {
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
let (connected, size) = match PageserverProtocol::from_connstring(shard0_connstr)? {
PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
};
self.fix_zenith_signal_neon_signal()?;
@@ -1264,20 +1083,23 @@ impl ComputeNode {
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_index = ShardIndex {
shard_number: ShardNumber(0),
shard_count: spec.pageserver_conninfo.shard_count,
let shard0_connstr = spec
.pageserver_connstr
.split(',')
.next()
.unwrap()
.to_string();
let shard_index = match spec.pageserver_connstr.split(',').count() as u8 {
0 | 1 => ShardIndex::unsharded(),
count => ShardIndex::new(ShardNumber(0), ShardCount(count)),
};
let shard0_url = spec
.pageserver_conninfo
.shard_url(ShardNumber(0), PageserverProtocol::Grpc)?
.to_owned();
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
shard0_url,
shard0_connstr,
spec.tenant_id,
spec.timeline_id,
shard0_index,
shard_index,
spec.storage_auth_token.clone(),
None, // NB: base backups use payload compression
)
@@ -1309,9 +1131,7 @@ impl ComputeNode {
/// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp
/// when the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec
.pageserver_conninfo
.shard_url(ShardNumber(0), PageserverProtocol::Libpq)?;
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
// Use the storage auth token from the config file, if given.
@@ -1398,7 +1218,10 @@ impl ComputeNode {
return result;
}
Err(ref e) if attempts < max_attempts => {
warn!("Failed to get basebackup: {e:?} (attempt {attempts}/{max_attempts})");
warn!(
"Failed to get basebackup: {} (attempt {}/{})",
e, attempts, max_attempts
);
std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64));
retry_period_ms *= 1.5;
}
@@ -1560,41 +1383,6 @@ impl ComputeNode {
Ok(lsn)
}
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let max_retries = 5;
let mut attempts = 0;
loop {
let result = self.sync_safekeepers(storage_auth_token.clone());
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
}
return result;
}
Err(e) if attempts < max_retries => {
tracing::info!(
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
);
}
Err(err) => {
tracing::warn!(
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
let backoff = exponential_backoff_duration(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
std::thread::sleep(backoff);
attempts += 1;
}
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
@@ -1604,8 +1392,6 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.params.pgdata);
let tls_config = self.tls_config(&pspec.spec);
let databricks_settings = spec.databricks_settings.as_ref();
let postgres_port = self.params.connstr.port();
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
@@ -1613,11 +1399,8 @@ impl ComputeNode {
pgdata_path,
&self.params,
&pspec.spec,
postgres_port,
self.params.internal_http_port,
tls_config,
databricks_settings,
self.params.lakebase_mode,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1630,7 +1413,7 @@ impl ComputeNode {
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
self.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
@@ -1646,31 +1429,19 @@ impl ComputeNode {
}
};
self.get_basebackup(compute_state, lsn)
.with_context(|| format!("failed to get basebackup@{lsn}"))?;
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
if let Some(settings) = databricks_settings {
copy_tls_certificates(
&settings.pg_compute_tls_settings.key_file,
&settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
// Update pg_hba.conf received with basebackup including additional databricks settings.
update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?;
update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?;
} else {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
}
if let Some(databricks_settings) = spec.databricks_settings.as_ref() {
copy_tls_certificates(
&databricks_settings.pg_compute_tls_settings.key_file,
&databricks_settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
}
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
@@ -1711,7 +1482,7 @@ impl ComputeNode {
// symlink doesn't affect anything.
//
// See https://github.com/neondatabase/autoscaling/issues/800
std::fs::remove_dir_all(pgdata_path.join("pg_dynshmem"))?;
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
@@ -1726,12 +1497,6 @@ impl ComputeNode {
/// Start and stop a postgres process to warm up the VM for startup.
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
if self.params.lakebase_mode {
// We are running in Hadron mode. Disabling this prewarming step for now as it could run
// into dblet port conflicts and also doesn't add much value with our current infra.
info!("Skipping postgres prewarming in Hadron mode");
return Ok(());
}
info!("prewarming VM memory");
// Create pgdata
@@ -1789,36 +1554,14 @@ impl ComputeNode {
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
let pgdata_path = Path::new(&self.params.pgdata);
let env_vars: Vec<(String, String)> = if self.params.lakebase_mode {
let databricks_env_vars = {
let state = self.state.lock().unwrap();
let spec = &state.pspec.as_ref().unwrap().spec;
DatabricksEnvVars::new(
spec,
Some(&self.params.compute_id),
self.params.instance_id.clone(),
self.params.lakebase_mode,
)
};
info!(
"Starting Postgres for databricks endpoint id: {}",
&databricks_env_vars.endpoint_id
);
let mut env_vars = databricks_env_vars.to_env_var_list();
env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t)));
env_vars
} else if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN".to_owned(), storage_auth_token.to_owned())]
} else {
vec![]
};
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.params.pgbin)
.args(["-D", &self.params.pgdata])
.envs(env_vars)
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
})
.stderr(Stdio::piped())
.spawn()
.expect("cannot start postgres process");
@@ -1925,7 +1668,7 @@ impl ComputeNode {
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0";
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
@@ -1970,15 +1713,7 @@ impl ComputeNode {
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
if self.params.lakebase_mode {
// Set a 2-minute statement_timeout for the session applying config. The individual SQL statements
// used in apply_spec_sql() should not take long (they are just creating users and installing
// extensions). If any of them are stuck for an extended period of time it usually indicates a
// pageserver connectivity problem and we should bail out.
conf.options("-c statement_timeout=2min");
}
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let conf = Arc::new(conf);
let spec = Arc::new(
@@ -2011,7 +1746,6 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let lakebase_mode = self.params.lakebase_mode;
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
@@ -2024,7 +1758,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
if let Err(e) = handle_migrations(params, &mut client).await {
error!("Failed to run migrations: {}", e);
}
}
@@ -2040,34 +1774,6 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
// Note that this merely triggers a notification on a condition variable the configurator thread
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
// applies it.
pub async fn signal_refresh_configuration(&self) -> Result<()> {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
ComputeStatus::RefreshConfigurationPending,
];
let mut state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
// If the compute is in Init state, we can't refresh the configuration immediately,
// but we should be able to do that soon.
Ok(())
} else {
Err(anyhow::anyhow!(
"Cannot refresh compute configuration in state {:?}",
state.status
))
}
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -2129,16 +1835,12 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
let postgres_port = self.params.connstr.port();
config::write_postgres_conf(
pgdata_path,
&self.params,
&spec,
postgres_port,
self.params.internal_http_port,
tls_config,
spec.databricks_settings.as_ref(),
self.params.lakebase_mode,
)?;
self.pg_reload_conf()?;
@@ -2244,8 +1946,6 @@ impl ComputeNode {
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
@@ -2296,17 +1996,7 @@ impl ComputeNode {
pub fn check_for_core_dumps(&self) -> Result<()> {
let core_dump_dir = match std::env::consts::OS {
"macos" => Path::new("/cores/"),
// BEGIN HADRON
// NB: Read core dump files from a fixed location outside of
// the data directory since `compute_ctl` wipes the data directory
// across container restarts.
_ => {
if self.params.lakebase_mode {
Path::new("/databricks/logs/brickstore")
} else {
Path::new(&self.params.pgdata)
}
} // END HADRON
_ => Path::new(&self.params.pgdata),
};
// Collect core dump paths if any
@@ -2380,13 +2070,13 @@ impl ComputeNode {
let result = client
.simple_query(
"SELECT
pg_catalog.row_to_json(pss)
row_to_json(pg_stat_statements)
FROM
public.pg_stat_statements pss
pg_stat_statements
WHERE
pss.userid != 'cloud_admin'::pg_catalog.regrole::pg_catalog.oid
userid != 'cloud_admin'::regrole::oid
ORDER BY
(pss.mean_exec_time + pss.mean_plan_time) DESC
(mean_exec_time + mean_plan_time) DESC
LIMIT 100",
)
.await;
@@ -2514,11 +2204,11 @@ LIMIT 100",
// check the role grants first - to gracefully handle read-replicas.
let select = "SELECT privilege_type
FROM pg_catalog.pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) AS acl ON true
JOIN pg_catalog.pg_user users ON acl.grantee = users.usesysid
WHERE users.usename OPERATOR(pg_catalog.=) $1::pg_catalog.name
AND nspname OPERATOR(pg_catalog.=) $2::pg_catalog.name";
FROM pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
JOIN pg_user users ON acl.grantee = users.usesysid
WHERE users.usename = $1
AND nspname = $2";
let rows = db_client
.query(select, &[role_name, schema_name])
.await
@@ -2587,9 +2277,8 @@ LIMIT 100",
.await
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query = format!(
"CREATE EXTENSION IF NOT EXISTS {ext_name} WITH SCHEMA public VERSION {quoted_version}"
);
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
db_client
.simple_query(&query)
.await
@@ -2620,7 +2309,7 @@ LIMIT 100",
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2639,7 +2328,7 @@ LIMIT 100",
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2692,22 +2381,22 @@ LIMIT 100",
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_conninfo = state
let old_pageserver_connstr = state
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_conninfo
.pageserver_connstr
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_conninfo = &s
let pageserver_connstr = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_conninfo;
unchanged = pageserver_conninfo == &old_pageserver_conninfo;
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
unchanged
})
.unwrap();
@@ -2780,7 +2469,7 @@ LIMIT 100",
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
if matches!(
prewarm_state,
LfcPrewarmState::Completed { .. }
LfcPrewarmState::Completed
| LfcPrewarmState::NotPrewarmed
| LfcPrewarmState::Skipped
) {
@@ -2813,34 +2502,6 @@ LIMIT 100",
);
}
}
/// Set the compute spec and update related metrics.
/// This is the central place where pspec is updated.
pub fn set_spec(params: &ComputeNodeParams, state: &mut ComputeState, pspec: ParsedSpec) {
state.pspec = Some(pspec);
ComputeNode::update_attached_metric(params, state);
let _ = logger::update_ids(&params.instance_id, &Some(params.compute_id.clone()));
}
pub fn update_attached_metric(params: &ComputeNodeParams, state: &mut ComputeState) {
// Update the pg_cctl_attached gauge when all identifiers are available.
if let Some(instance_id) = &params.instance_id {
if let Some(pspec) = &state.pspec {
// Clear all values in the metric
COMPUTE_ATTACHED.reset();
// Set new metric value
COMPUTE_ATTACHED
.with_label_values(&[
&params.compute_id,
instance_id,
&pspec.tenant_id.to_string(),
&pspec.timeline_id.to_string(),
])
.set(1);
}
}
}
}
pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
@@ -2965,10 +2626,7 @@ mod tests {
match ParsedSpec::try_from(spec.clone()) {
Ok(_p) => panic!("Failed to detect duplicate entry"),
Err(e) => assert!(
e.to_string()
.starts_with("duplicate entry in safekeeper_connstrings:")
),
Err(e) => assert!(e.starts_with("duplicate entry in safekeeper_connstrings:")),
};
}
}

View File

@@ -7,11 +7,18 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use std::time::Instant;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
@@ -20,7 +27,7 @@ struct EndpointStoragePair {
const KEY: &str = "lfc_state";
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
@@ -46,8 +53,36 @@ impl EndpointStoragePair {
}
impl ComputeNode {
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
self.state.lock().unwrap().lfc_prewarm_state.clone()
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
@@ -55,37 +90,36 @@ impl ComputeNode {
}
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
let token: CancellationToken;
{
let state = &mut self.state.lock().unwrap();
token = state.lfc_prewarm_token.clone();
if let LfcPrewarmState::Prewarming =
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
return false;
}
}
crate::metrics::LFC_PREWARMS.inc();
let this = self.clone();
let cloned = self.clone();
spawn(async move {
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
Ok(state) => state,
let state = match cloned.prewarm_impl(from_endpoint).await {
Ok(true) => LfcPrewarmState::Completed,
Ok(false) => {
info!(
"skipping LFC prewarm because LFC state is not found in endpoint storage"
);
LfcPrewarmState::Skipped
}
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
let error = format!("{err:#}");
LfcPrewarmState::Failed { error }
LfcPrewarmState::Failed {
error: err.to_string(),
}
}
};
let state = &mut this.state.lock().unwrap();
if let LfcPrewarmState::Cancelled = prewarm_state {
state.lfc_prewarm_token = CancellationToken::new();
}
state.lfc_prewarm_state = prewarm_state;
cloned.state.lock().unwrap().lfc_prewarm_state = state;
});
true
}
@@ -97,101 +131,51 @@ impl ComputeNode {
}
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
token: CancellationToken,
) -> Result<LfcPrewarmState> {
let EndpointStoragePair {
url,
token: storage_token,
} = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let mut now = Instant::now();
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
response = request.send() => response
}
.context("querying endpoint storage")?;
match response.status() {
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
match status {
StatusCode::OK => (),
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"),
StatusCode::NOT_FOUND => {
return Ok(false);
}
_ => bail!("{status} querying endpoint storage"),
}
let state_download_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut uncompressed = Vec::new();
let lfc_state = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
lfc_state = response.bytes() => lfc_state
}
.context("getting request body from endpoint storage")?;
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let lfc_state = res
.bytes()
.await
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
// Client connection and prewarm info querying are fast and therefore don't need
// cancellation
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let pg_token = client.cancel_token();
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
select! {
res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
_ = token.cancelled() => {
pg_token.cancel_query(postgres::NoTls).await
.context("cancelling neon.prewarm_local_cache()")?;
return Ok(LfcPrewarmState::Cancelled)
}
}
.context("loading LFC state into postgres")
.map(|_| ())?;
let prewarm_time_ms = now.elapsed().as_millis() as u32;
let row = client
.query_one("select * from neon.get_prewarm_info()", &[])
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.await
.context("querying prewarm info")?;
let total = row.try_get(0).unwrap_or_default();
let prewarmed = row.try_get(1).unwrap_or_default();
let skipped = row.try_get(2).unwrap_or_default();
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(LfcPrewarmState::Completed {
total,
prewarmed,
skipped,
state_download_time_ms,
uncompress_time_ms,
prewarm_time_ms,
})
Ok(true)
}
/// If offload request is ongoing, return false, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
return false;
}
}
@@ -203,10 +187,7 @@ impl ComputeNode {
pub async fn offload_lfc_async(self: &Arc<Self>) {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
return;
}
}
@@ -215,69 +196,48 @@ impl ComputeNode {
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let state = match self.offload_lfc_impl().await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC");
let error = format!("{err:#}");
LfcOffloadState::Failed { error }
}
let Err(err) = self.offload_lfc_impl().await else {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
self.state.lock().unwrap().lfc_offload_state = state;
}
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let mut now = Instant::now();
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.await
.context("querying LFC state")?;
let state = row
.try_get::<usize, Option<&[u8]>>(0)
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
};
let state_query_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut compressed = Vec::new();
ZstdEncoder::new(state)
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}");
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
let response = request
.send()
.await
.context("writing to endpoint storage")?;
let state_upload_time_ms = now.elapsed().as_millis() as u32;
let status = response.status();
if status != StatusCode::OK {
bail!("request to endpoint storage failed: {status}");
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
),
Err(err) => Err(err).context("writing to endpoint storage"),
}
Ok(LfcOffloadState::Completed {
compress_time_ms,
state_query_time_ms,
state_upload_time_ms,
})
}
pub fn cancel_prewarm(self: &Arc<Self>) {
self.state.lock().unwrap().lfc_prewarm_token.cancel();
}
}

View File

@@ -1,56 +1,69 @@
use crate::compute::ComputeNode;
use anyhow::{Context, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use std::time::Instant;
use tracing::info;
use anyhow::{Context, Result, bail};
use compute_api::{
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
spec::ComputeMode,
};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If http client calling this function
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let this = self.clone();
let promote_fn = async move || match this.promote_impl(cfg).await {
Ok(state) => state,
Err(err) => {
tracing::error!(%err, "promoting replica");
let error = format!("{err:#}");
PromoteState::Failed { error }
}
};
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
let cloned = self.clone();
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) });
tokio::spawn(async move {
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
Ok(_) => PromoteState::Completed,
Err(err) => {
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: err.to_string(),
}
}
})
});
rx
};
let mut task;
// promote_impl locks self.state so we need to unlock it before calling task.changed()
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
let promote_state = &mut self.state.lock().unwrap().promote_state;
task = promote_state.get_or_insert_with(start_promotion).clone()
}
if task.changed().await.is_err() {
let error = "promote sender dropped".to_string();
return PromoteState::Failed { error };
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
// Why do we have to supply safekeepers?
// For secondary we use primary_connection_conninfo so safekeepers field is empty
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != compute_api::spec::ComputeMode::Replica {
bail!("compute mode \"{}\" is not replica", mode.to_type_str());
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
bail!("compute {status}")
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "compute prewarm failed")
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
@@ -59,66 +72,51 @@ impl ComputeNode {
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let mut now = Instant::now();
let primary_lsn = cfg.wal_flush_lsn;
let mut standby_lsn = utils::lsn::Lsn::INVALID;
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
standby_lsn = lsn.into();
if standby_lsn >= primary_lsn {
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if standby_lsn < primary_lsn {
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
cfg.spec.safekeeper_connstrings.join(",")
safekeepers_lsn.safekeepers
);
client
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query(
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
&[],
)
.await
.context("setting synchronous_standby_names")?;
client
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| bail!(
"compute-promotion failpoint"
));
let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.query_one("SELECT * FROM pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() failed");
bail!("pg_promote() returned false");
}
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
let now = Instant::now();
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
@@ -127,47 +125,8 @@ impl ComputeNode {
bail!("replica in read only mode after promotion");
}
// Already checked validity in http handler
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
{
let mut state = self.state.lock().unwrap();
// Local setup has different ports for pg process (port=) for primary and secondary.
// Primary is stopped so we need secondary's "port" value
#[cfg(feature = "testing")]
{
let old_spec = &state.pspec.as_ref().unwrap().spec;
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
};
let set: std::collections::HashMap<&str, &str> = old_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
};
new_conf.push_str(&format!("port={}\n", set["port"]));
}
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
if self.params.lakebase_mode {
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
} else {
state.pspec = Some(new_pspec);
}
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()?;
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
Ok(PromoteState::Completed {
lsn_wait_time_ms,
pg_promote_time_ms,
reconfigure_time_ms,
})
let mut state = self.state.lock().unwrap();
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
Ok(())
}
}

View File

@@ -7,19 +7,14 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{
ComputeAudit, ComputeMode, ComputeSpec, DatabricksSettings, GenericOption,
};
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::compute::ComputeNodeParams;
use crate::pg_helpers::{
DatabricksSettingsExt as _, GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize,
escape_conf_value,
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
use utils::shard::{ShardIndex, ShardNumber};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
@@ -45,16 +40,12 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
#[allow(clippy::too_many_arguments)]
pub fn write_postgres_conf(
pgdata_path: &Path,
params: &ComputeNodeParams,
spec: &ComputeSpec,
postgres_port: Option<u16>,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
databricks_settings: Option<&DatabricksSettings>,
lakebase_mode: bool,
) -> Result<()> {
let path = pgdata_path.join("postgresql.conf");
// File::create() destroys the file content if it exists.
@@ -65,78 +56,15 @@ pub fn write_postgres_conf(
writeln!(file, "{conf}")?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
writeln!(file)?;
if let Some(conninfo) = &spec.pageserver_connection_info {
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = conninfo.stripe_size {
writeln!(
file,
"# from compute spec's pageserver_connection_info.stripe_size field"
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let num_shards = if conninfo.shard_count.0 == 0 {
1 // unsharded, treat it as a single shard
} else {
conninfo.shard_count.0
};
for shard_number in 0..num_shards {
let shard_index = ShardIndex {
shard_number: ShardNumber(shard_number),
shard_count: conninfo.shard_count,
};
let info = conninfo.shards.get(&shard_index).ok_or_else(|| {
anyhow::anyhow!(
"shard {shard_index} missing from pageserver_connection_info shard map"
)
})?;
let first_pageserver = info
.pageservers
.first()
.expect("must have at least one pageserver");
// Add the libpq URL to the array, or if the URL is missing, reset the array
// forgetting any previous entries. All servers must have a libpq URL, or none
// at all.
if let Some(url) = &first_pageserver.libpq_url {
if let Some(ref mut urls) = libpq_urls {
urls.push(url.clone());
}
} else {
libpq_urls = None
}
}
if let Some(libpq_urls) = libpq_urls {
writeln!(
file,
"# derived from compute spec's pageserver_connection_info field"
)?;
writeln!(
file,
"neon.pageserver_connstring={}",
escape_conf_value(&libpq_urls.join(","))
)?;
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
} else {
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "# from compute spec's shard_stripe_size field")?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "# from compute spec's pageserver_connstring field")?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
@@ -357,24 +285,6 @@ pub fn write_postgres_conf(
writeln!(file, "log_destination='stderr,syslog'")?;
}
if lakebase_mode {
// Explicitly set the port based on the connstr, overriding any previous port setting.
// Note: It is important that we don't specify a different port again after this.
let port = postgres_port.expect("port must be present in connstr");
writeln!(file, "port = {port}")?;
// This is databricks specific settings.
// This should be at the end of the file but before `compute_ctl_temp_override.conf` below
// so that it can override any settings above.
// `compute_ctl_temp_override.conf` is intended to override any settings above during specific operations.
// To prevent potential breakage in the future, we keep it above `compute_ctl_temp_override.conf`.
writeln!(file, "# Databricks settings start")?;
if let Some(settings) = databricks_settings {
writeln!(file, "{}", settings.as_pg_settings())?;
}
writeln!(file, "# Databricks settings end")?;
}
// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;

View File

@@ -1,40 +1,23 @@
use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::{path::Path, sync::Arc};
use anyhow::Result;
use compute_api::responses::{ComputeConfig, ComputeStatus};
use compute_api::responses::ComputeStatus;
use tracing::{error, info, instrument};
use crate::compute::{ComputeNode, ParsedSpec};
use crate::spec::get_config_from_control_plane;
use crate::compute::ComputeNode;
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let mut state = compute.state.lock().unwrap();
/* BEGIN_HADRON */
// RefreshConfiguration should only be used inside the loop
assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
/* END_HADRON */
if compute.params.lakebase_mode {
while state.status != ComputeStatus::ConfigurationPending
&& state.status != ComputeStatus::RefreshConfigurationPending
&& state.status != ComputeStatus::Failed
{
info!("configurator: compute status: {:?}, sleeping", state.status);
state = compute.state_changed.wait(state).unwrap();
}
} else {
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// Re-check the status after waking up
@@ -54,136 +37,6 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::RefreshConfigurationPending {
info!(
"compute node suspects its configuration is out of date, now refreshing configuration"
);
state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
// Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
// This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
// is safe to drop the lock like this.
drop(state);
let get_config_result: anyhow::Result<ComputeConfig> =
if let Some(config_path) = &compute.params.config_path_test_only {
// This path is only to make testing easier. In production we always get the config from the HCC.
info!(
"reloading config.json from path: {}",
config_path.to_string_lossy()
);
let path = Path::new(config_path);
if let Ok(file) = File::open(path) {
match serde_json::from_reader::<File, ComputeConfig>(file) {
Ok(config) => Ok(config),
Err(e) => {
error!("could not parse config file: {}", e);
Err(anyhow::anyhow!("could not parse config file: {}", e))
}
}
} else {
error!(
"could not open config file at path: {:?}",
config_path.to_string_lossy()
);
Err(anyhow::anyhow!(
"could not open config file at path: {}",
config_path.to_string_lossy()
))
}
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
} else {
Err(anyhow::anyhow!("config_path_test_only is not set"))
};
// Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
let parsed_spec_result: Result<Option<ParsedSpec>> =
get_config_result.and_then(|config| {
if let Some(spec) = config.spec {
if let Ok(pspec) = ParsedSpec::try_from(spec) {
Ok(Some(pspec))
} else {
Err(anyhow::anyhow!("could not parse spec"))
}
} else {
Ok(None)
}
});
let new_status: ComputeStatus;
match parsed_spec_result {
// Control plane (HCM) returned a spec and we were able to parse it.
Ok(Some(pspec)) => {
{
let mut state = compute.state.lock().unwrap();
// Defensive programming to make sure this thread is indeed the only one that can move the compute
// node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
// into the type system.
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
if state
.pspec
.as_ref()
.map(|ps| ps.pageserver_conninfo.clone())
== Some(pspec.pageserver_conninfo.clone())
{
info!(
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
);
state.status = ComputeStatus::Running;
compute.state_changed.notify_all();
drop(state);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
// but it's not worth forking the codebase too much for this minor point alone right now.
state.pspec = Some(pspec);
}
match compute.reconfigure() {
Ok(_) => {
info!("Refresh configuration: compute node configured");
new_status = ComputeStatus::Running;
}
Err(e) => {
error!(
"Refresh configuration: could not configure compute node: {}",
e
);
// Set the compute node back to the `RefreshConfigurationPending` state if the configuration
// was not successful. It should be okay to treat this situation the same as if the loop
// hasn't executed yet as long as the detection side keeps notifying.
new_status = ComputeStatus::RefreshConfigurationPending;
}
}
}
// Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
Ok(None) => {
info!(
"Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
);
// We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
// clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
// configuration state).
std::process::exit(1);
}
// Various error cases:
// - The request to the control plane (HCM) either failed or returned a malformed spec.
// - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
Err(e) => {
error!(
"Refresh configuration: error getting a parsed spec: {:?}",
e
);
new_status = ComputeStatus::RefreshConfigurationPending;
// We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
// retrying to avoid hammering the HCM.
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");

View File

@@ -1,60 +0,0 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,29 +16,13 @@ use crate::http::JsonResponse;
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
// BEGIN HADRON
// Hadron instance ID. Only set if it's a Lakebase V1 a.k.a. Hadron instance.
instance_id: Option<String>,
// END HADRON
jwks: JwkSet,
validation: Validation,
}
impl Authorize {
pub fn new(compute_id: String, instance_id: Option<String>, jwks: JwkSet) -> Self {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// BEGIN HADRON
let use_rsa = jwks.keys.iter().any(|jwk| {
jwk.common
.key_algorithm
.is_some_and(|alg| alg == jsonwebtoken::jwk::KeyAlgorithm::RS256)
});
if use_rsa {
validation = Validation::new(Algorithm::RS256);
}
// END HADRON
validation.validate_exp = true;
// Unused by the control plane
validation.validate_nbf = false;
@@ -50,7 +34,6 @@ impl Authorize {
Self {
compute_id,
instance_id,
jwks,
validation,
}
@@ -64,20 +47,10 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
let compute_id = self.compute_id.clone();
let is_hadron_instance = self.instance_id.is_some();
let jwks = self.jwks.clone();
let validation = self.validation.clone();
Box::pin(async move {
// BEGIN HADRON
// In Hadron deployments the "external" HTTP endpoint on compute_ctl can only be
// accessed by trusted components (enforced by dblet network policy), so we can bypass
// all auth here.
if is_hadron_instance {
return Ok(request);
}
// END HADRON
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await

View File

@@ -96,7 +96,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchemaWithLsn"
$ref: "#/components/schemas/SafekeepersLsn"
responses:
200:
description: Promote succeeded or wasn't started
@@ -139,15 +139,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
delete:
tags:
- Prewarm
summary: Cancel ongoing LFC prewarm
description: ""
operationId: cancelLfcPrewarm
responses:
202:
description: Prewarm cancelled
/lfc/offload:
post:
@@ -306,7 +297,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchema"
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
@@ -593,30 +591,26 @@ components:
type: string
example: "1.0.0"
ComputeSchema:
SafekeepersLsn:
type: object
required:
- spec
properties:
spec:
type: object
ComputeSchemaWithLsn:
type: object
required:
- spec
- safekeepers
- wal_flush_lsn
properties:
spec:
$ref: "#/components/schemas/ComputeState"
wal_flush_lsn:
safekeepers:
description: Primary replica safekeepers
type: string
wal_flush_lsn:
description: Primary last WAL flush LSN
type: string
description: "last WAL flush LSN"
example: "0/028F10D8"
LfcPrewarmState:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: LFC prewarm status
@@ -634,15 +628,6 @@ components:
skipped:
description: Pages processed but not prewarmed
type: integer
state_download_time_ms:
description: Time it takes to download LFC state to compute
type: integer
uncompress_time_ms:
description: Time it takes to uncompress LFC state
type: integer
prewarm_time_ms:
description: Time it takes to prewarm LFC state in Postgres
type: integer
LfcOffloadState:
type: object
@@ -651,21 +636,11 @@ components:
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, skipped, failed]
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: LFC offload error, if any
type: string
state_query_time_ms:
description: Time it takes to get LFC state from Postgres
type: integer
compress_time_ms:
description: Time it takes to compress LFC state
type: integer
state_upload_time_ms:
description: Time it takes to upload LFC state to endpoint storage
type: integer
PromoteState:
type: object
@@ -679,15 +654,6 @@ components:
error:
description: Promote error, if any
type: string
lsn_wait_time_ms:
description: Time it takes for secondary to catch up with primary WAL flush LSN
type: integer
pg_promote_time_ms:
description: Time it takes to call pg_promote on secondary
type: integer
reconfigure_time_ms:
description: Time it takes to reconfigure promoted secondary
type: integer
SetRoleGrantsRequest:
type: object

View File

@@ -43,12 +43,7 @@ pub(in crate::http) async fn configure(
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
if compute.params.lakebase_mode {
ComputeNode::set_spec(&compute.params, &mut state, pspec);
} else {
state.pspec = Some(pspec);
}
state.pspec = Some(pspec);
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
drop(state);
}

View File

@@ -1,34 +0,0 @@
use crate::pg_isready::pg_isready;
use crate::{compute::ComputeNode, http::JsonResponse};
use axum::{extract::State, http::StatusCode, response::Response};
use std::sync::Arc;
/// NOTE: NOT ENABLED YET
/// Detect if the compute is alive.
/// Called by the liveness probe of the compute container.
pub(in crate::http) async fn hadron_liveness_probe(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
let port = match compute.params.connstr.port() {
Some(port) => port,
None => {
return JsonResponse::error(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get the port from the connection string",
);
}
};
match pg_isready(&compute.params.pg_isready_bin, port) {
Ok(_) => {
// The connection is successful, so the compute is alive.
// Return a 200 OK response.
JsonResponse::success(StatusCode::OK, "ok")
}
Err(e) => {
tracing::error!("Hadron liveness probe failed: {}", e);
// The connection failed, so the compute is not alive.
// Return a 500 Internal Server Error response.
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -1,11 +1,12 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
Json(compute.lfc_prewarm_state().await)
}
@@ -45,8 +46,3 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
)
}
}
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
compute.cancel_prewarm();
StatusCode::ACCEPTED
}

View File

@@ -1,19 +1,10 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::communicator_socket_client::connect_communicator_socket;
use crate::compute::ComputeNode;
use crate::hadron_metrics;
use crate::http::JsonResponse;
use crate::metrics::collect;
@@ -22,18 +13,11 @@ pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let mut metrics = collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
// Add Hadron metrics.
let hadron_metrics: Vec<MetricFamily> = hadron_metrics::collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect();
metrics.extend(hadron_metrics);
let encoder = TextEncoder::new();
let mut buffer = vec![];
@@ -47,42 +31,3 @@ pub(in crate::http) async fn get_metrics() -> Response {
.body(Body::from(buffer))
.unwrap()
}
/// Fetch and forward metrics from the Postgres neon extension's metrics
/// exporter that are used by autoscaling-agent.
///
/// The neon extension exposes these metrics over a Unix domain socket
/// in the data directory. That's not accessible directly from the outside
/// world, so we have this endpoint in compute_ctl to expose it
pub(in crate::http) async fn get_autoscaling_metrics(
State(compute): State<Arc<ComputeNode>>,
) -> Result<Response, Response> {
let pgdata = Path::new(&compute.params.pgdata);
// Connect to the communicator process's metrics socket
let mut metrics_client = connect_communicator_socket(pgdata)
.await
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Make a request for /autoscaling_metrics
let request = Request::builder()
.method("GET")
.uri("/autoscaling_metrics")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(Body::from(""))
.unwrap();
let resp = metrics_client
.send_request(request)
.await
.context("fetching metrics from Postgres metrics service")
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Build a response that just forwards the response we got.
let mut response = Response::builder();
response = response.status(resp.status());
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
response = response.header(CONTENT_TYPE, content_type);
}
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
Ok(response.body(body).unwrap())
}

View File

@@ -10,13 +10,11 @@ pub(in crate::http) mod extension_server;
pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod hadron_liveness_probe;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod refresh_configuration;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,25 +1,14 @@
use crate::http::JsonResponse;
use axum::extract::Json;
use compute_api::responses::PromoteConfig;
use axum::Form;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<PromoteConfig>,
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
) -> axum::response::Response {
// Return early at the cost of extra parsing spec
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
let cfg = PromoteConfig {
spec: pspec.spec,
wal_flush_lsn: cfg.wal_flush_lsn,
};
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
let state = compute.promote(safekeepers_lsn).await;
if let compute_api::responses::PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -1,29 +0,0 @@
// This file is added by Hadron
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use http::StatusCode;
use crate::compute::ComputeNode;
use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse;
/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
/// configuration to be reloaded in a best effort manner. Invocation of this method does not
/// guarantee that a reconfiguration will occur. The caller should consider keep sending this
/// request while it believes that the compute configuration is out of date.
pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
}
}

View File

@@ -1,7 +1,7 @@
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use http::StatusCode;
@@ -33,29 +33,7 @@ pub(in crate::http) async fn terminate(
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
// If compute is Empty, there's no Postgres to terminate. The regular compute_ctl termination path
// assumes Postgres to be configured and running, so we just special-handle this case by exiting
// the process directly.
if compute.params.lakebase_mode && state.status == ComputeStatus::Empty {
drop(state);
info!("terminating empty compute - will exit process");
// Queue a task to exit the process after 5 seconds. The 5-second delay aims to
// give enough time for the HTTP response to be sent so that HCM doesn't get an abrupt
// connection termination.
tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
info!("exiting process after terminating empty compute");
std::process::exit(0);
});
return StatusCode::OK.into_response();
}
// For Running status, proceed with normal termination
state.set_status(mode.into(), &compute.state_changed);
drop(state);
}
forward_termination_signal(false);

View File

@@ -23,8 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote,
refresh_configuration, status, terminate,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -44,7 +43,6 @@ pub enum Server {
port: u16,
config: ComputeCtlConfig,
compute_id: String,
instance_id: Option<String>,
},
}
@@ -69,12 +67,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant))
// Hadron: Compute-initiated configuration refresh
.route(
"/refresh_configuration",
post(refresh_configuration::refresh_configuration),
);
.route("/grants", post(grants::add_grant));
// Add in any testing support
if cfg!(feature = "testing") {
@@ -86,25 +79,13 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
router
}
Server::External {
config,
compute_id,
instance_id,
..
config, compute_id, ..
} => {
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/autoscaling_metrics",
get(metrics::get_autoscaling_metrics),
);
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route(
"/lfc/prewarm",
get(lfc::prewarm_state)
.post(lfc::prewarm)
.delete(lfc::cancel_prewarm),
)
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
@@ -115,13 +96,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.route(
"/hadron_liveness_probe",
get(hadron_liveness_probe::hadron_liveness_probe),
)
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
instance_id.clone(),
config.jwks.clone(),
)));

View File

@@ -2,7 +2,6 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use once_cell::sync::Lazy;
use tokio_postgres::error::Error as PostgresError;
use tokio_postgres::{Client, Config, NoTls};
@@ -19,7 +18,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
.query(
"SELECT datname FROM pg_catalog.pg_database
WHERE datallowconn
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
AND datconnlimit <> - 2
LIMIT 500",
&[],
)
@@ -67,7 +66,7 @@ pub async fn get_installed_extensions(
let extensions: Vec<(String, String, i32)> = client
.query(
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
&[],
)
.await?
@@ -120,7 +119,3 @@ pub async fn get_installed_extensions(
extensions: extensions_map.into_values().collect(),
})
}
pub fn initialize_metrics() {
Lazy::force(&INSTALLED_EXTENSIONS);
}

View File

@@ -4,7 +4,6 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checker;
pub mod communicator_socket_client;
pub mod config;
pub mod configurator;
pub mod http;
@@ -16,7 +15,6 @@ pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
@@ -25,7 +23,6 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pg_isready;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;

View File

@@ -1,10 +1,7 @@
use std::collections::HashMap;
use std::sync::{LazyLock, RwLock};
use tracing::Subscriber;
use tracing::info;
use tracing_appender;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
///
@@ -16,63 +13,31 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// `tracing-utils` package description.
///
pub fn init_tracing_and_logging(
default_log_level: &str,
log_dir_opt: &Option<String>,
) -> anyhow::Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
// Standard output streams
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(std::io::stderr);
// Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
std::fs::create_dir_all(log_dir)?;
let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("pgcctl")
// Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
// At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
.max_log_files(2)
.build(log_dir)
.expect("Initializing rolling file appender should succeed");
let (file_logs_writer, _file_logs_guard) =
tracing_appender::non_blocking(file_logs_appender);
let json_to_file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(file_logs_writer);
(Some(json_to_file_layer), Some(_file_logs_guard))
} else {
(None, None)
};
// Initialize OpenTelemetry
let provider =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.with(json_to_file_layer)
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok((provider, _file_logs_guard))
Ok(())
}
/// Replace all newline characters with a special character to make it
@@ -127,157 +92,3 @@ pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
None
}
}
/// Track relevant id's
const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
let ids = format!(
r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
);
let mut guard = IDS
.write()
.map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
*guard = ids;
Ok(())
}
/// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
struct PgJsonLogShapeFormatter;
impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> fmt::format::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &fmt::FmtContext<'_, S, N>,
mut writer: fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
// Format values from the event's metadata, and open message string
let metadata = event.metadata();
{
let ids_guard = IDS.read();
let ids = ids_guard
.as_ref()
.map(|guard| guard.as_str())
// Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
.unwrap_or(UNKNOWN_IDS);
write!(
&mut writer,
r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
metadata.level(),
metadata.target(),
ids
)?;
}
let mut message = String::new();
let message_writer = fmt::format::Writer::new(&mut message);
// Gather the message
ctx.field_format().format_fields(message_writer, event)?;
// TODO: any better options than to copy-paste this OSS span formatter?
// impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
// write message, close bracket, and new line
writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
}
}
#[cfg(feature = "testing")]
#[cfg(test)]
mod test {
use super::*;
use std::{cell::RefCell, io};
// Use thread_local! instead of Mutex for test isolation
thread_local! {
static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
}
#[derive(Clone, Default)]
struct StaticStringWriter;
impl io::Write for StaticStringWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl fmt::MakeWriter<'_> for StaticStringWriter {
type Writer = Self;
fn make_writer(&self) -> Self::Writer {
Self
}
}
#[test]
fn test_log_pg_json_shape_formatter() {
// Use a scoped subscriber to prevent global state pollution
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(StaticStringWriter),
);
let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
// Clear any previous test state
WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
let messages = [
"test message",
r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
];
tracing::subscriber::with_default(subscriber, || {
for message in messages {
tracing::info!(message);
}
});
tracing::info!("not test message");
// Get captured output
let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
let json_strings: Vec<&str> = output.lines().collect();
assert_eq!(
json_strings.len(),
messages.len(),
"Log didn't have the expected number of json strings."
);
let json_string_shape_regex = regex::Regex::new(
r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
).unwrap();
for (i, expected_message) in messages.iter().enumerate() {
let json_string = json_strings[i];
assert!(
json_string_shape_regex.is_match(json_string),
"Json log didn't match expected pattern:\n{json_string}",
);
let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
let actual_message = parsed_json["message"].as_str().unwrap();
assert_eq!(*expected_message, actual_message);
}
}
}

View File

@@ -4,13 +4,14 @@ use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
use compute_api::spec::{ComputeMode, PageserverProtocol};
use itertools::Itertools as _;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use crate::compute::ComputeNode;
@@ -77,16 +78,17 @@ fn acquire_lsn_lease_with_retry(
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let (conninfo, auth) = {
let (connstrings, auth) = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
(
spec.pageserver_conninfo.clone(),
spec.pageserver_connstr.clone(),
spec.storage_auth_token.clone(),
)
};
let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -110,44 +112,35 @@ fn acquire_lsn_lease_with_retry(
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
conninfo: PageserverConnectionInfo,
connstrings: &str,
auth: Option<&str>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut leases = Vec::new();
for (shard_index, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number: shard_index.shard_number,
shard_count: shard_index.shard_count,
for (shard_number, &connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
// XXX: If there are more than pageserver for the one shard, do we need to get a
// leas on all of them? Currently, that's what we assume, but this is hypothetical
// as of this writing, as we never pass the info for more than one pageserver per
// shard.
for pageserver in shard.pageservers {
let lease = match conninfo.prefer_protocol {
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&pageserver.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&pageserver.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
}
let lease = match PageserverProtocol::from_connstring(connstring)? {
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
};
leases.push(lease);
}
Ok(leases.into_iter().min().flatten())

View File

@@ -9,20 +9,15 @@ use crate::metrics::DB_MIGRATION_FAILED;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
lakebase_mode: bool,
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Self {
client,
migrations,
lakebase_mode,
}
Self { client, migrations }
}
/// Get the current value neon_migration.migration_id
@@ -76,7 +71,7 @@ impl<'m> MigrationRunner<'m> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
@@ -135,13 +130,8 @@ impl<'m> MigrationRunner<'m> {
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let migration = if self.lakebase_mode {
migration.replace("neon_superuser", "databricks_superuser")
} else {
migration.to_string()
};
match Self::run_migration(self.client, migration_id, &migration).await {
match Self::run_migration(self.client, migration_id, migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -15,17 +15,17 @@ DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member')
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I INHERIT;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
END LOOP;
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles
FOR role_name IN SELECT rolname FROM pg_roles
WHERE
NOT pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT pg_catalog.starts_with(rolname, 'pg_')
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOBYPASSRLS;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name = 'server_version_num') THEN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
END IF;
END $$;

View File

@@ -5,9 +5,9 @@ DO $$
DECLARE
role_name TEXT;
BEGIN
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE rolreplication IS TRUE
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOREPLICATION;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'server_version_num'::pg_catalog.text) THEN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
END IF;

View File

@@ -2,7 +2,7 @@ DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser';
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;

View File

@@ -4,8 +4,8 @@ DECLARE
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_catalog.pg_roles
WHERE pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
@@ -14,12 +14,12 @@ BEGIN
FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_catalog.pg_roles
WHERE NOT pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT pg_catalog.starts_with(rolname, 'pg_')
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', pg_catalog.quote_ident(role.name);
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;

View File

@@ -1,10 +1,10 @@
DO $$
BEGIN
IF (SELECT pg_catalog.current_setting('server_version_num')::pg_catalog.numeric < 160000) THEN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;
IF NOT (SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;

View File

@@ -2,12 +2,12 @@ DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_catalog.pg_auth_members
WHERE roleid = 'pg_monitor'::pg_catalog.regrole
AND member = 'neon_superuser'::pg_catalog.regrole;
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';

View File

@@ -2,11 +2,11 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT pg_catalog.bool_and(pg_catalog.has_function_privilege('neon_superuser', oid, 'execute'))
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_catalog.pg_proc
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::pg_catalog.regnamespace;
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;

View File

@@ -2,9 +2,9 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT pg_catalog.has_function_privilege('neon_superuser', oid, 'execute')
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_catalog.pg_proc
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN

View File

@@ -2,10 +2,10 @@ DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_catalog.pg_auth_members
FROM pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;

View File

@@ -11,7 +11,6 @@ use tracing::{Level, error, info, instrument, span};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
/// Struct to store runtime state of the compute monitor thread.
@@ -353,47 +352,13 @@ impl ComputeMonitor {
// Hang on condition variable waiting until the compute status is `Running`.
fn wait_for_postgres_start(compute: &ComputeNode) {
let mut state = compute.state.lock().unwrap();
let pg_init_timeout = compute
.params
.pg_init_timeout
.unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
while state.status != ComputeStatus::Running {
info!("compute is not running, waiting before monitoring activity");
if !compute.params.lakebase_mode {
state = compute.state_changed.wait(state).unwrap();
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::Running {
break;
}
continue;
if state.status == ComputeStatus::Running {
break;
}
if state.pg_start_time.is_some()
&& Utc::now()
.signed_duration_since(state.pg_start_time.unwrap())
.to_std()
.unwrap_or_default()
> pg_init_timeout
{
// If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
// possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
// deadends we simply exit (panic) the compute node so it can restart with the latest spec.
//
// NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
// This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
// to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
error!(
"compute did not enter Running state in {} seconds, exiting",
pg_init_timeout.as_secs()
);
std::process::exit(1);
}
state = compute
.state_changed
.wait_timeout(state, Duration::from_secs(5))
.unwrap()
.0;
}
}
@@ -407,9 +372,9 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
// like `postgres_exporter` use it to query Postgres statistics.
// Use explicit 8 bytes type casts to match Rust types.
let stats = cli.query_one(
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
FROM pg_catalog.pg_stat_database
"SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
coalesce(sum(sessions), 0)::bigint AS total_sessions
FROM pg_stat_database
WHERE datname NOT IN (
'postgres',
'template0',
@@ -445,11 +410,11 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
let mut last_active: Option<DateTime<Utc>> = None;
// Get all running client backends except ourself, use RFC3339 DateTime format.
let backends = cli.query(
"SELECT state, pg_catalog.to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"'::pg_catalog.text) AS state_change
"SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
FROM pg_stat_activity
WHERE backend_type OPERATOR(pg_catalog.=) 'client backend'::pg_catalog.text
AND pid OPERATOR(pg_catalog.!=) pg_catalog.pg_backend_pid()
AND usename OPERATOR(pg_catalog.!=) 'cloud_admin'::pg_catalog.name;", // XXX: find a better way to filter other monitors?
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);

View File

@@ -11,9 +11,7 @@ use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use compute_api::responses::TlsConfig;
use compute_api::spec::{
Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use futures::StreamExt;
use indexmap::IndexMap;
use ini::Ini;
@@ -186,42 +184,6 @@ impl DatabaseExt for Database {
}
}
pub trait DatabricksSettingsExt {
fn as_pg_settings(&self) -> String;
}
impl DatabricksSettingsExt for DatabricksSettings {
fn as_pg_settings(&self) -> String {
// Postgres GUCs rendered from DatabricksSettings
vec![
// ssl_ca_file
Some(format!(
"ssl_ca_file = '{}'",
self.pg_compute_tls_settings.ca_file
)),
// [Optional] databricks.workspace_url
Some(format!(
"databricks.workspace_url = '{}'",
&self.databricks_workspace_host
)),
// todo(vikas.jain): these are not required anymore as they are moved to static
// conf but keeping these to avoid image mismatch between hcc and pg.
// Once hcc and pg are in sync, we can remove these.
//
// databricks.enable_databricks_identity_login
Some("databricks.enable_databricks_identity_login = true".to_string()),
// databricks.enable_sql_restrictions
Some("databricks.enable_sql_restrictions = true".to_string()),
]
.into_iter()
// Removes `None`s
.flatten()
.collect::<Vec<String>>()
.join("\n")
+ "\n"
}
}
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {
@@ -299,9 +261,9 @@ pub async fn get_existing_dbs_async(
.query_raw::<str, &String, &[String; 0]>(
"SELECT
datname AS name,
(SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
datconnlimit = - 2 AS invalid
FROM
pg_catalog.pg_database;",
&[],

View File

@@ -1,30 +0,0 @@
use anyhow::{Context, anyhow};
// Run `/usr/local/bin/pg_isready -p {port}`
// Check the connectivity of PG
// Success means PG is listening on the port and accepting connections
// Note that PG does not need to authenticate the connection, nor reserve a connection quota for it.
// See https://www.postgresql.org/docs/current/app-pg-isready.html
pub fn pg_isready(bin: &str, port: u16) -> anyhow::Result<()> {
let child_result = std::process::Command::new(bin)
.arg("-p")
.arg(port.to_string())
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => Err(anyhow!("process exited with {status}")),
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| format!("could not run `{bin} --port {port}`"))
}
// It's safe to assume pg_isready is under the same directory with postgres,
// because it is a PG util bin installed along with postgres
pub fn get_pg_isready_bin(pgbin: &str) -> String {
let split = pgbin.split("/").collect::<Vec<&str>>();
split[0..split.len() - 1].join("/") + "/pg_isready"
}

View File

@@ -1,6 +1,4 @@
use std::fs::File;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use anyhow::{Result, anyhow, bail};
@@ -135,25 +133,10 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
let pghba_path = pgdata_path.join("pg_hba.conf");
// Update pg_hba to contains databricks specfic settings before adding neon settings
// PG uses the first record that matches to perform authentication, so we need to have
// our rules before the default ones from neon.
// See https://www.postgresql.org/docs/current/auth-pg-hba-conf.html
if let Some(databricks_pg_hba) = databricks_pg_hba {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_hba),
)? {
info!("updated pg_hba.conf to include databricks_pg_hba.conf");
} else {
info!("pg_hba.conf already included databricks_pg_hba.conf");
}
}
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
info!("updated pg_hba.conf to allow external connections");
} else {
@@ -163,59 +146,6 @@ pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) ->
Ok(())
}
/// Check `pg_ident.conf` and update if needed to allow databricks config.
pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
info!("checking pg_ident.conf");
let pghba_path = pgdata_path.join("pg_ident.conf");
// Update pg_ident to contains databricks specfic settings
if let Some(databricks_pg_ident) = databricks_pg_ident {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_ident),
)? {
info!("updated pg_ident.conf to include databricks_pg_ident.conf");
} else {
info!("pg_ident.conf already included databricks_pg_ident.conf");
}
}
Ok(())
}
/// Copy tls key_file and cert_file from k8s secret mount directory
/// to pgdata and set private key file permissions as expected by Postgres.
/// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
/// K8s secrets mount on dblet does not honor permission and ownership
/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
pub fn copy_tls_certificates(
key_file: &String,
cert_file: &String,
pgdata_path: &Path,
) -> Result<()> {
let files = [cert_file, key_file];
for file in files.iter() {
let source = Path::new(file);
let dest = pgdata_path.join(source.file_name().unwrap());
if !dest.exists() {
std::fs::copy(source, &dest)?;
info!(
"Copying tls file: {} to {}",
&source.display(),
&dest.display()
);
}
if *file == key_file {
// Postgres requires private key to be readable only by the owner by having
// chmod 600 permissions.
let permissions = Permissions::from_mode(0o600);
fs::set_permissions(&dest, permissions)?;
info!("Setting permission on {}.", &dest.display());
}
}
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
@@ -240,11 +170,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(
params: ComputeNodeParams,
client: &mut Client,
lakebase_mode: bool,
) -> Result<()> {
pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -308,7 +234,7 @@ pub async fn handle_migrations(
),
];
MigrationRunner::new(client, &migrations, lakebase_mode)
MigrationRunner::new(client, &migrations)
.run_migrations()
.await?;

View File

@@ -13,19 +13,17 @@ use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles};
use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS;
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles,
CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension,
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions,
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions,
@@ -82,7 +80,7 @@ impl ComputeNode {
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
Ok(result) => !result.is_empty(),
Err(e) =>
{
@@ -168,7 +166,6 @@ impl ComputeNode {
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
self.params.lakebase_mode,
);
Ok(tokio::spawn(fut))
@@ -189,33 +186,15 @@ impl ComputeNode {
};
}
let phases = if self.params.lakebase_mode {
vec![
CreatePrivilegedRole,
// BEGIN_HADRON
CreateDatabricksRoles,
AlterDatabricksRoles,
// END_HADRON
for phase in [
CreatePrivilegedRole,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
]
} else {
vec![
CreatePrivilegedRole,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
]
};
for phase in phases {
] {
info!("Applying phase {:?}", &phase);
apply_operations(
params.clone(),
@@ -224,7 +203,6 @@ impl ComputeNode {
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
self.params.lakebase_mode,
)
.await?;
}
@@ -276,7 +254,6 @@ impl ComputeNode {
concurrency_token.clone(),
db,
phases,
self.params.lakebase_mode,
);
Ok(tokio::spawn(fut))
@@ -288,28 +265,12 @@ impl ComputeNode {
handle.await??;
}
let mut phases = if self.params.lakebase_mode {
vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
// BEGIN_HADRON
HandleDatabricksAuthExtension,
// END_HADRON
CreateAvailabilityCheck,
DropRoles,
// BEGIN_HADRON
AddDatabricksGrants,
CreateDatabricksMisc,
// END_HADRON
]
} else {
vec![
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
]
};
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
@@ -342,7 +303,6 @@ impl ComputeNode {
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
self.params.lakebase_mode,
)
.await?;
}
@@ -368,7 +328,6 @@ impl ComputeNode {
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
lakebase_mode: bool,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
@@ -396,7 +355,6 @@ impl ComputeNode {
let client = client_conn.as_ref().unwrap();
Ok(client)
},
lakebase_mode,
)
.await?;
}
@@ -453,8 +411,7 @@ impl ComputeNode {
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30..300 => limit / 3,
300.. => 100,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
@@ -519,10 +476,6 @@ pub enum PerDatabasePhase {
#[derive(Clone, Debug)]
pub enum ApplySpecPhase {
CreatePrivilegedRole,
// BEGIN_HADRON
CreateDatabricksRoles,
AlterDatabricksRoles,
// END_HADRON
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -535,14 +488,7 @@ pub enum ApplySpecPhase {
DisablePostgresDBPgAudit,
HandleOtherExtensions,
HandleNeonExtension,
// BEGIN_HADRON
HandleDatabricksAuthExtension,
// END_HADRON
CreateAvailabilityCheck,
// BEGIN_HADRON
AddDatabricksGrants,
CreateDatabricksMisc,
// END_HADRON
DropRoles,
FinalizeDropLogicalSubscriptions,
}
@@ -578,7 +524,6 @@ pub async fn apply_operations<'a, Fut, F>(
jwks_roles: Arc<HashSet<String>>,
apply_spec_phase: ApplySpecPhase,
client: F,
lakebase_mode: bool,
) -> Result<()>
where
F: FnOnce() -> Fut,
@@ -625,23 +570,6 @@ where
},
query
);
if !lakebase_mode {
return res;
}
// BEGIN HADRON
if let Err(e) = res.as_ref() {
if let Some(sql_state) = e.code() {
if sql_state.code() == "57014" {
// SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts.
// Increment the counter whenever a statement timeout occurs. Timeouts on
// this configuration path can only occur due to PS connectivity problems that
// Postgres failed to recover from.
COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc();
}
}
}
// END HADRON
res
}
.instrument(inspan)
@@ -679,44 +607,10 @@ async fn get_operations<'a>(
ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
query: format!(
include_str!("sql/create_privileged_role.sql"),
privileged_role_name = params.privileged_role_name,
privileges = if params.lakebase_mode {
"CREATEDB CREATEROLE NOLOGIN BYPASSRLS"
} else {
"CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS"
}
privileged_role_name = params.privileged_role_name
),
comment: None,
}))),
// BEGIN_HADRON
// New Hadron phase
ApplySpecPhase::CreateDatabricksRoles => {
let queries = create_databricks_roles();
let operations = queries.into_iter().map(|query| Operation {
query,
comment: None,
});
Ok(Box::new(operations))
}
// Backfill existing databricks_reader_* roles with statement timeout from GUC
ApplySpecPhase::AlterDatabricksRoles => {
let query = String::from(include_str!(
"sql/alter_databricks_reader_roles_timeout.sql"
));
let operations = once(Operation {
query,
comment: Some(
"Backfill existing databricks_reader_* roles with statement timeout"
.to_string(),
),
});
Ok(Box::new(operations))
}
// End of new Hadron Phase
// END_HADRON
ApplySpecPhase::DropInvalidDatabases => {
let mut ctx = ctx.write().await;
let databases = &mut ctx.dbs;
@@ -1086,10 +980,7 @@ async fn get_operations<'a>(
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
)
// HADRON change:
.replace("neon_superuser", &params.privileged_role_name),
// HADRON change end ,
),
comment: None,
},
// This now will only drop privileges of the role
@@ -1125,8 +1016,7 @@ async fn get_operations<'a>(
comment: None,
},
Operation {
query: String::from(include_str!("sql/default_grants.sql"))
.replace("neon_superuser", &params.privileged_role_name),
query: String::from(include_str!("sql/default_grants.sql")),
comment: None,
},
]
@@ -1142,9 +1032,7 @@ async fn get_operations<'a>(
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
if libs.contains("pg_stat_statements") {
return Ok(Box::new(once(Operation {
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
),
query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
comment: Some(String::from("create system extensions")),
})));
}
@@ -1152,13 +1040,11 @@ async fn get_operations<'a>(
Ok(Box::new(empty()))
}
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
comment: Some(String::from("create pgaudit extensions")),
}))),
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
),
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
comment: Some(String::from("create pgauditlogtofile extensions")),
}))),
// Disable pgaudit logging for postgres database.
@@ -1182,7 +1068,7 @@ async fn get_operations<'a>(
},
Operation {
query: String::from(
"UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
"UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
),
comment: Some(String::from("compat/fix: make neon relocatable")),
},
@@ -1199,28 +1085,6 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
// BEGIN_HADRON
// Note: we may want to version the extension someday, but for now we just drop it and recreate it.
ApplySpecPhase::HandleDatabricksAuthExtension => {
let operations = vec![
Operation {
query: String::from("DROP EXTENSION IF EXISTS databricks_auth"),
comment: Some(String::from("dropping existing databricks_auth extension")),
},
Operation {
query: String::from("CREATE EXTENSION databricks_auth"),
comment: Some(String::from("creating databricks_auth extension")),
},
Operation {
query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"),
comment: Some(String::from("grant select on databricks auth counters")),
},
]
.into_iter();
Ok(Box::new(operations))
}
// END_HADRON
ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
comment: None,
@@ -1238,63 +1102,6 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
// BEGIN_HADRON
// New Hadron phases
//
// Grants permissions to roles that are used by Databricks.
ApplySpecPhase::AddDatabricksGrants => {
let operations = vec![
Operation {
query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"),
comment: Some(String::from(
"Permissions needed to execute neon.* functions (in the postgres database)",
)),
},
Operation {
query: String::from(
"GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor",
),
comment: Some(String::from("Permissions needed for read and write probes")),
},
Operation {
query: String::from(
"GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor",
),
comment: Some(String::from(
"Permissions needed to monitor .snap file counts",
)),
},
Operation {
query: String::from(
"GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor",
),
comment: Some(String::from(
"Permissions needed to access neon performance counters view",
)),
},
Operation {
query: String::from(
"GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor",
),
comment: Some(String::from(
"Permissions needed to execute the underlying performance counters function",
)),
},
]
.into_iter();
Ok(Box::new(operations))
}
// Creates minor objects that are used by Databricks.
ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation {
query: String::from(include_str!("sql/create_databricks_misc.sql")),
comment: Some(String::from(
"The function databricks_monitor uses to convert exception to 0 or 1",
)),
}))),
// End of new Hadron phases
// END_HADRON
ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
comment: None,

View File

@@ -3,17 +3,16 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename::pg_catalog.name OPERATOR(pg_catalog.=) 'health_check'::pg_catalog.name
AND schemaname::pg_catalog.name OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
WHERE tablename = 'health_check'
)
THEN
CREATE TABLE public.health_check (
id pg_catalog.int4 primary key generated by default as identity,
updated_at pg_catalog.timestamptz default pg_catalog.now()
CREATE TABLE health_check (
id serial primary key,
updated_at timestamptz default now()
);
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = pg_catalog.now();
SET updated_at = now();
END IF;
END
$$

View File

@@ -1,25 +0,0 @@
DO $$
DECLARE
reader_role RECORD;
timeout_value TEXT;
BEGIN
-- Get the current GUC setting for reader statement timeout
SELECT current_setting('databricks.reader_statement_timeout', true) INTO timeout_value;
-- Only proceed if timeout_value is not null/empty and not '0' (disabled)
IF timeout_value IS NOT NULL AND timeout_value != '' AND timeout_value != '0' THEN
-- Find all databricks_reader_* roles and update their statement_timeout
FOR reader_role IN
SELECT r.rolname
FROM pg_roles r
WHERE r.rolname ~ '^databricks_reader_\d+$'
LOOP
-- Apply the timeout setting to the role (will overwrite existing setting)
EXECUTE format('ALTER ROLE %I SET statement_timeout = %L',
reader_role.rolname, timeout_value);
RAISE LOG 'Updated statement_timeout = % for role %', timeout_value, reader_role.rolname;
END LOOP;
END IF;
END
$$;

View File

@@ -0,0 +1,12 @@
DO $$
DECLARE
query varchar;
BEGIN
FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};'
FROM pg_proc p
JOIN pg_namespace nsp ON p.pronamespace = nsp.oid
WHERE nsp.nspname = 'anon' LOOP
EXECUTE query;
END LOOP;
END
$$;

View File

@@ -1,15 +0,0 @@
ALTER ROLE databricks_monitor SET statement_timeout = '60s';
CREATE OR REPLACE FUNCTION health_check_write_succeeds()
RETURNS INTEGER AS $$
BEGIN
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();
RETURN 1;
EXCEPTION WHEN OTHERS THEN
RAISE EXCEPTION '[DATABRICKS_SMGR] health_check failed: [%] %', SQLSTATE, SQLERRM;
RETURN 0;
END;
$$ LANGUAGE plpgsql;

View File

@@ -1,8 +1,8 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) '{privileged_role_name}'::pg_catalog.name)
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
THEN
CREATE ROLE {privileged_role_name} {privileges} IN ROLE pg_read_all_data, pg_write_all_data;
CREATE ROLE {privileged_role_name} CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -4,14 +4,14 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'
WHERE nspname = 'public'
) AND
pg_catalog.current_setting('server_version_num')::int OPERATOR(pg_catalog./) 10000 OPERATOR(pg_catalog.>=) 15
current_setting('server_version_num')::int / 10000 >= 15
THEN
IF EXISTS(
SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname OPERATOR(pg_catalog.=) 'web_access'
WHERE rolname = 'web_access'
)
THEN
GRANT CREATE ON SCHEMA public TO web_access;
@@ -20,7 +20,7 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'
WHERE nspname = 'public'
)
THEN
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;

View File

@@ -2,17 +2,11 @@ DO ${outer_tag}$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_catalog.pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN
SELECT pg_subscription.subname
FROM pg_catalog.pg_subscription
WHERE subdbid OPERATOR(pg_catalog.=) (
SELECT oid FROM pg_database WHERE datname OPERATOR(pg_catalog.=) {datname_str}::pg_catalog.name
)
LOOP
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE pg_catalog.format('DROP SUBSCRIPTION %I;', subname);
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
END LOOP;
END;
${outer_tag}$;

View File

@@ -3,19 +3,19 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename OPERATOR(pg_catalog.=) 'drop_subscriptions_done'::pg_catalog.name
AND schemaname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name
WHERE tablename = 'drop_subscriptions_done'
AND schemaname = 'neon'
)
THEN
CREATE TABLE neon.drop_subscriptions_done
(id pg_catalog.int4 primary key generated by default as identity, timeline_id pg_catalog.text);
(id serial primary key, timeline_id text);
END IF;
-- preserve the timeline_id of the last drop_subscriptions run
-- to ensure that the cleanup of a timeline is executed only once.
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
INSERT INTO neon.drop_subscriptions_done VALUES (1, pg_catalog.current_setting('neon.timeline_id'))
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
ON CONFLICT (id) DO UPDATE
SET timeline_id = pg_catalog.current_setting('neon.timeline_id')::pg_catalog.text;
SET timeline_id = current_setting('neon.timeline_id');
END
$$

Some files were not shown because too many files have changed in this diff Show More