Compare commits

..

85 Commits

Author SHA1 Message Date
Mikhail Kot
8004abbb9c fix 2025-06-24 12:50:59 +01:00
Mikhail Kot
d817dbab37 style 2025-06-24 12:41:30 +01:00
Mikhail Kot
a85cd674d2 NO_CLOSE_RANGE macro for old glibc/kernel builds 2025-06-24 12:41:30 +01:00
Arpad Müller
552249607d apply clippy fixes for 1.88.0 beta (#12331)
The 1.88.0 stable release is near (this Thursday). We'd like to fix most
warnings beforehand so that the compiler upgrade doesn't require
approval from too many teams.

This is therefore a preparation PR (like similar PRs before it).

There is a lot of changes for this release, mostly because the
`uninlined_format_args` lint has been added to the `style` lint group.
One can read more about the lint
[here](https://rust-lang.github.io/rust-clippy/master/#/uninlined_format_args).

The PR is the result of `cargo +beta clippy --fix` and `cargo fmt`. One
remaining warning is left for the proxy team.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-06-24 10:12:42 +00:00
Ivan Efremov
a29772bf6e Create proxy-bench periodic run in CI (#12242)
Currently run for test only via pushing to the test-proxy-bench branch.

Relates to the #22681
2025-06-24 09:54:43 +00:00
Arpad Müller
0efff1db26 Allow cancellation errors in tests that allow timeline deletion errors (#12315)
After merging of PR https://github.com/neondatabase/neon/pull/11712 we
saw some tests be flaky, with errors showing up about the timeline
having been cancelled instead of having been deleted. This is an outcome
that is inherently racy with the "has been deleted" error.

In some instances, https://github.com/neondatabase/neon/pull/11712 has
already added the error about the timeline having been cancelled. This
PR adds them to the remaining instances of
https://github.com/neondatabase/neon/pull/11712, fixing the flakiness.
2025-06-23 22:26:38 +00:00
Aleksandr Sarantsev
5eecde461d storcon: Fix migration for Attached(0) tenants (#12256)
## Problem

`Attached(0)` tenant migrations can get stuck if the heatmap file has
not been uploaded.

## Summary of Changes

- Added a test to reproduce the issue.
- Introduced a `kick_secondary_downloads` config flag:
  - Enabled in testing environments.
  - Disabled in production (and in the new test).
- Updated `Attached(0)` locations to consider the number of secondaries
in their intent when deciding whether to download the heatmap.
2025-06-23 18:55:26 +00:00
Alex Chi Z.
85164422d0 feat(pageserver): support force overriding feature flags (#12233)
## Problem

Part of #11813 

## Summary of changes

Add a test API to make it easier to manipulate the feature flags within
tests.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-23 17:31:53 +00:00
John Spray
6c3aba7c44 storcon: adjust AZ selection for heterogenous AZs (#12296)
## Problem

The scheduler uses total shards per AZ to select the AZ for newly
created or attached tenants.

This makes bad decisions when we have different node counts per AZ -- we
might have 2 very busy pageservers in one AZ, and 4 more lightly loaded
pageservers in other AZs, and the scheduler picks the busy pageservers
because the total shard count in their AZ is lower.

## Summary of changes

- Divide the shard count by the number of nodes in the AZ when scoring
in `get_az_for_new_tenant`

---------

Co-authored-by: John Spray <john.spray@databricks.com>
2025-06-23 15:50:31 +00:00
Erik Grinaker
68a175d545 test_runner: fix test_basebackup_with_high_slru_count gzip param (#12319)
The `--gzip-probability` parameter was removed in #12250. However,
`test_basebackup_with_high_slru_count` still uses it, and keeps failing.

This patch removes the use of the parameter (gzip is enabled by
default).
2025-06-23 15:33:45 +00:00
Alex Chi Z.
5e2c444525 fix(pageserver): reduce default feature flag refresh interval (#12246)
## Problem

Part of #11813 

## Summary of changes

The current interval is 30s and it costs a lot of $$$. This patch
reduced it to 600s refresh interval (which means that it takes 10min for
feature flags to propagate from UI to the pageserver). In the future we
can let storcon retrieve the feature flags and push it to pageservers.
We can consider creating a new release or we can postpone this to the
week after the next week.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-23 13:51:21 +00:00
Heikki Linnakangas
8d711229c1 ci: Fix bogus skipping of 'make all' step in CI (#12318)
The 'make all' step must run always. PR #12311 accidentally left the
condition in there to skip it if there were no changes in postgres v14
sources. That condition belonged to a whole different step that was
removed altogether in PR#12311, and the condition should've been removed
too.

Per CI failure:
https://github.com/neondatabase/neon/actions/runs/15820148967/job/44587394469
2025-06-23 13:23:33 +00:00
Vlad Lazar
0e490f3be7 pageserver: allow concurrent rw IO on in-mem layer (#12151)
## Problem

Previously, we couldn't read from an in-memory layer while a batch was
being written to it. Vice-versa, we couldn't write to it while there
was an on-going read.

## Summary of Changes

The goal of this change is to improve concurrency. Writes happened
through a &mut self method so the enforcement was at the type system
level.

We attempt to improve by:
1. Adding interior mutability to EphemeralLayer. This involves wrapping
   the buffered writer in a read-write lock.
2. Minimise the time that the read lock is held for. Only hold the read
   lock while reading from the buffers (recently flushed or pending
   flush). If we need to read from the file, drop the lock and allow IO
   to be concurrent.
   
The new benchmark variants with concurrent reads improve between 70 to
200 percent (against main).
Benchmark results are in this
[commit](891f094ce6).

## Future Changes

We can push the interior mutability into the buffered writer. The
mutable tail goes under a read lock, the flushed part goes into an
ArcSwap and then we can read from anything that is flushed _without_ any
locking.
2025-06-23 13:17:30 +00:00
Erik Grinaker
7e41ef1bec pageserver: set gRPC basebackup chunk size to 256 KB (#12314)
gRPC base backups send a stream of fixed-size 64KB chunks.

pagebench basebackup with compression enabled shows this to reduce
throughput:

* 64 KB: 55 RPS
* 128 KB: 69 RPS
* 256 KB: 73 RPS
* 1024 KB: 73 RPS

This patch sets the base backup chunk size to 256 KB.
2025-06-23 12:41:11 +00:00
Heikki Linnakangas
7916aa26e0 Stop using build-tools image in compute image build (#12306)
The build-tools image contains various build tools and dependencies,
mostly Rust-related. The compute image build used it to build
compute_ctl and a few other little rust binaries that are included in
the compute image. However, for extensions built in Rust (pgrx), the
build used a different layer which installed the rust toolchain using
rustup.

Switch to using the same rust toolchain for both pgrx-based extensions
and compute_ctl et al. Since we don't need anything else from the
build-tools image, I switched to using the toolchain installed with
rustup, and eliminated the dependency to build-tools altogether. The
compute image build no longer depends on build-tools.

Note: We no longer use 'mold' for linking compute_ctl et al, since mold
is not included in the build-deps-with-cargo layer. We could add it
there, but it doesn't seem worth it. I proposed stopping using mold
altogether in https://github.com/neondatabase/neon/pull/10735, but that
was rejected because 'mold' is faster for incremental builds. That
doesn't matter much for docker builds however, since they're not
incremental, and the compute binaries are not as large as the storage
server binaries anyway.
2025-06-23 09:11:05 +00:00
Heikki Linnakangas
52ab8f3e65 Use make all in the "Build and Test locally" CI workflow (#12311)
To avoid duplicating the build logic. `make all` covers the separate
`postgres-*` and `neon-pg-ext` steps, and also does `cargo build`.
That's how you would typically do a full local build anyway.
2025-06-23 09:10:32 +00:00
Heikki Linnakangas
3d822dbbde Refactor Makefile rules for building the extensions under pgxn/ (#12305) 2025-06-22 19:43:14 +00:00
Heikki Linnakangas
af46b5286f Avoid recompiling postgres_ffi when there has been no changes (#12292)
Every time you run `make`, it runs `make install` on all the PostgreSQL
sources, which copies the header files. That in turn triggers a rebuild
of the `postgres_ffi` crate, and everything that depends on it. We had
worked around this earlier (see #2458), by passing a custom INSTALL
script to the Postgres makefiles, which refrains from updating the
modification timestamp on headers when they have not been changed, but
the v14 makefile didn't obey INSTALL for the header files. Backporting
c0a1d7621b to v14 fixes that.

This backports upstream PostgreSQL commit c0a1d7621b to v14.

Corresponding PR in the 'postgres' repo:
https://github.com/neondatabase/postgres/pull/660
2025-06-21 21:07:38 +00:00
Erik Grinaker
47f7efee06 pageserver: require stripe size (#12257)
## Problem

In #12217, we began passing the stripe size in reattach responses, and
persisting it in the on-disk state. This is necessary to ensure the
storage controller and Pageserver have a consistent view of the intended
stripe size of unsharded tenants, which will be used for splits that do
not specify a stripe size. However, for backwards compatibility, these
stripe sizes were optional.

## Summary of changes

Make the stripe sizes required for reattach responses and on-disk
location configs. These will always be provided by the previous
(current) release.
2025-06-21 15:01:29 +00:00
Tristan Partin
868c38f522 Rename the compute_ctl admin scope to compute_ctl:admin (#12263)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-20 22:49:05 +00:00
Tristan Partin
c8b2ac93cf Allow the control plane to override any Postgres connection options (#12262)
The previous behavior was for the compute to override control plane
options if there was a conflict. We want to change the behavior so that
the control plane has the absolute power on what is right. In the event
that we need a new option passed to the compute as soon as possible, we
can initially roll it out in the control plane, and then migrate the
option to EXTRA_OPTIONS within the compute later, for instance.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-20 18:46:30 +00:00
Dmitrii Kovalkov
b2954d16ff storcon, neon_local: add timeline_safekeeper_count (#12303)
## Problem
We need to specify the number of safekeepers for neon_local without
`testing` feature.
Also we need this option for testing different configurations of
safekeeper migration code.

We cannot set it in `neon_fixtures.py` and in the default config of
`neon_local` yet, because it will fail compatibility tests. I'll make a
separate PR with removing `cfg!("testing")` completely and specifying
this option in the config when this option reaches the release branch.

- Part of https://github.com/neondatabase/neon/issues/12298

## Summary of changes
- Add `timeline_safekeeper_count` config option to storcon and
neon_local
2025-06-20 16:03:17 +00:00
Alex Chi Z.
79485e7c3a feat(pageserver): enable gc-compaction by default everywhere (#12105)
Enable it across tests and set it as default. Marks the first milestone
of https://github.com/neondatabase/neon/issues/9114. We already enabled
it in all AWS regions and planning to enable it in all Azure regions
next week.

will merge after we roll out in all regions.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-20 15:35:11 +00:00
Heikki Linnakangas
eaf1ab21c4 Store intermediate build files in build/ rather than pg_install/build/ (#12295)
This way, `pg_install` contains only the final build artifacts, not
intermediate files like *.o files. Seems cleaner.
2025-06-20 14:50:03 +00:00
Vlad Lazar
6508f4e5c1 pageserver: revise gc layer map lock handling (#12290)
## Problem

Timeline GC is very aggressive with regards to layer map locking.
We've seen timelines with loads of layers in production that hold the
write lock for the layer map for 30 minutes at a time.
This blocks reads and the write path to some extent.

## Summary of changes

Determining the set of layers to GC is done under the read lock.
Applying the updates is done under the write lock.
Previously, everything was done under write lock.
2025-06-20 11:57:30 +00:00
Conrad Ludgate
a298d2c29b [proxy] replace the batch cancellation queue, shorten the TTL for cancel keys (#11943)
See #11942 

Idea: 
* if connections are short lived, they can get enqueued and then also
remove themselves later if they never made it to redis. This reduces the
load on the queue.
* short lived connections (<10m, most?) will only issue 1 command, we
remove the delete command and rely on ttl.
* we can enqueue as many commands as we want, as we can always cancel
the enqueue, thanks to the ~~intrusive linked lists~~ `BTreeMap`.
2025-06-20 11:48:01 +00:00
Arpad Müller
8b197de7ff Increase upload timeout for test_tenant_s3_restore (#12297)
Increase the upload timeout of the test to avoid hitting timeouts (which
we sometimes do).
 
Fixes https://github.com/neondatabase/neon/issues/12212
2025-06-20 10:33:11 +00:00
Erik Grinaker
15d079cd41 pagebench: improve getpage-latest-lsn gRPC support (#12293)
This improves `pagebench getpage-latest-lsn` gRPC support by:

* Using `page_api::Client`.
* Removing `--protocol`, and using the `page-server-connstring` scheme
instead.
* Adding `--compression` to enable zstd compression.
2025-06-20 08:31:40 +00:00
Erik Grinaker
dc1625cd8e pagebench: add basebackup gRPC support (#12250)
## Problem

Pagebench does not support gRPC for `basebackup` benchmarks.

Requires #12243.
Touches #11728.

## Summary of changes

Add gRPC support via gRPC connstrings, e.g. `pagebench basebackup
--page-service-connstring grpc://localhost:51051`.

Also change `--gzip-probability` to `--no-compression`, since this must
be specified per-client for gRPC.
2025-06-19 15:40:57 +00:00
dependabot[bot]
a6d4de25cd build(deps): bump urllib3 from 1.26.19 to 2.5.0 in the pip group across 1 directory (#12289)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-19 14:20:02 +00:00
Arpad Müller
ec1452a559 Switch on --timelines-onto-safekeepers in integration tests (#11712)
Switch on the `--timelines-onto-safekeepers` param in integration tests.
Some changes that were needed to enable this but which I put into other
PRs to not clutter up this one:

* #11786
* #11854
* #12129
* #12138

Further fixes that were needed for this:

* https://github.com/neondatabase/neon/pull/11801
* https://github.com/neondatabase/neon/pull/12143
* https://github.com/neondatabase/neon/pull/12204

Not strictly needed, but helpful:

* https://github.com/neondatabase/neon/pull/12155

Part of #11670
Closes #11424
2025-06-19 11:17:01 +00:00
Heikki Linnakangas
1950ccfe33 Eliminate dependency from pageserver_api to postgres_ffi (#12273)
Introduce a separate `postgres_ffi_types` crate which contains a few
types and functions that were used in the API. `postgres_ffi_types` is a
much small crate than `postgres_ffi`, and it doesn't depend on bindgen
or the Postgres C headers.

Move NeonWalRecord and Value types to wal_decoder crate. They are only
used in the pageserver-safekeeper "ingest" API. The rest of the ingest
API types are defined in wal_decoder, so move these there as well.
2025-06-19 10:31:27 +00:00
Heikki Linnakangas
2ca6665f4a Remove outdated 'clean' Makefile targest (#12288)
We have been bad at keeping them up-to-date, several contrib modules and
neon extensions were missing from the clean rules. Give up trying, and
remove the targets altogether. In practice, it's straightforward to just
do `rm -rf pg_install/build`, so the clean-targets are hardly worth the
maintenance effort.

I kept `make distclean` though. The rule for that is simple enough.
2025-06-19 10:24:09 +00:00
Heikki Linnakangas
fa954671b2 Remove unnecessary Postgres libs from the storage docker image (#12286)
Since commit 87ad50c925, storage_controller has used diesel_async, which
in turn uses tokio-postgres as the Postgres client, which doesn't
require libpq. Thus we no longer need libpq in the storage image.
2025-06-19 10:00:01 +00:00
Erik Grinaker
6f4ffdb48b pageserver: add gRPC compression (#12280)
## Problem

The gRPC page service should support compression.

Requires #12111.
Touches #11728.
Touches https://github.com/neondatabase/cloud/issues/25679.

## Summary of changes

Add support for gzip and zstd compression in the server, and a client
parameter to enable compression.

This will need further benchmarking under realistic network conditions.
2025-06-19 09:54:34 +00:00
Vlad Lazar
3f676df3d5 pageserver: fix initial layer visibility calculation (#12206)
## Problem

GC info is an input to updating layer visibility.
Currently, gc info is updated on timeline activation and visibility is
computed on tenant attach, so we ignore branch points and compute
visibility by taking all layers into account.

Side note: gc info is also updated when timelines are created and
dropped. That doesn't help because we create the timelines in
topological order from the root. Hence the root timeline goes first,
without context of where the branch points are.

The impact of this in prod is that shards need to rehydrate layers after
live migration since the non-visible ones were excluded from the
heatmap.

## Summary of Changes

Move the visibility calculation into tenant attachment instead of
activation.
2025-06-19 09:53:18 +00:00
Suhas Thalanki
20f4febce1 fix: additional changes to terminate pgbouncer on compute suspend (#12153) (#12284)
Addressed [retrospective comments
](https://github.com/neondatabase/neon/pull/12153#discussion_r2154197503)
to https://github.com/neondatabase/neon/pull/12153
2025-06-18 19:31:22 +00:00
Mikhail
762905cf8d endpoint storage: parse config with type:LocalFs|AwsS3|AzureContainer (#12282)
https://github.com/neondatabase/cloud/issues/27195
2025-06-18 17:45:20 +00:00
Elizabeth Murray
830ef35ed3 Domain client for Pageserver GRPC. (#12111)
Add domain client for new communicator GRPC types.
2025-06-18 15:51:49 +00:00
Erik Grinaker
d8d62fb7cb test_runner: add gRPC support (#12279)
## Problem

`test_runner` integration tests should support gRPC.

Touches #11926.

## Summary of changes

* Enable gRPC for Pageservers, with dynamic port allocations.
* Add a `grpc` parameter for endpoint creation, plumbed through to
`neon_local endpoint create`.

No tests actually use gRPC yet, since computes don't support it yet.
2025-06-18 14:05:13 +00:00
Aleksandr Sarantsev
e6a404c66d Fix flaky test_sharding_split_failures (#12199)
## Problem

`test_sharding_failures` is flaky due to interference from the
`background_reconcile` process.

The details are in the issue
https://github.com/neondatabase/neon/issues/12029.

## Summary of changes

- Use `reconcile_until_idle` to ensure a stable state before running
test assertions
- Added error tolerance in `reconcile_until_idle` test function (Failure
cases: 1, 3, 19, 20)
- Ignore the `Keeping extra secondaries` warning message since it i
retryable (Failure case: 2)
- Deduplicated code in `assert_rolled_back` and `assert_split_done`
- Added a log message before printing plenty of Node `X` seen on
pageserver `Y`
2025-06-18 13:27:41 +00:00
Peter Bendel
7e711ede44 Increase tenant size for large tenant oltp workload (#12260)
## Problem

- We run the large tenant oltp workload with a fixed size (larger than
existing customers' workloads).
Our customer's workloads are continuously growing and our testing should
stay ahead of the customers' production workloads.
- we want to touch all tables in the tenant's database (updates) so that
we simulate a continuous change in layer files like in a real production
workload
- our current oltp benchmark uses a mixture of read and write
transactions, however we also want a separate test run with read-only
transactions only

## Summary of changes
- modify the existing workload to have a separate run with pgbench
custom scripts that are read-only
- create a new workload that 
- grows all large tables in each run (for the reuse branch in the large
oltp tenant's project)
- updates a percentage of rows in all large tables in each run (to
enforce table bloat and auto-vacuum runs and layer rebuild in
pageservers

Each run of the new workflow increases the logical database size about
16 GB.
We start with 6 runs per day which will give us about 96-100 GB growth
per day.

---------

Co-authored-by: Alexander Lakhin <alexander.lakhin@neon.tech>
2025-06-18 12:40:25 +00:00
Mikhail
e95f2f9a67 compute_ctl: return LSN in /terminate (#12240)
- Add optional `?mode=fast|immediate` to `/terminate`, `fast` is
default. Immediate avoids waiting 30
  seconds before returning from `terminate`.
- Add `TerminateMode` to `ComputeStatus::TerminationPending`
- Use `/terminate?mode=immediate` in `neon_local` instead of `pg_ctl
stop` for `test_replica_promotes`.
- Change `test_replica_promotes` to check returned LSN
- Annotate `finish_sync_safekeepers` as `noreturn`.

https://github.com/neondatabase/cloud/issues/29807
2025-06-18 12:25:19 +00:00
Heikki Linnakangas
5a045e7d52 Move pagestream_api to separate module (#12272)
For general readability.
2025-06-18 12:03:14 +00:00
Dimitri Fontaine
67fbc0582e Validate safekeeper_connstrings when parsing compute specs. (#11906)
This check API only cheks the safekeeper_connstrings at the moment, and
the validation is limited to checking we have at least one entry in
there, and no duplicates.

## Problem

If the compute_ctl service is started with an empty list of safekeepers,
then hard-to-debug errors may happen at runtime, where it would be much
easier to catch them early.

## Summary of changes

Add an entry point in the compute_ctl API to validate the configuration
for safekeeper_connstrings.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-06-18 10:01:05 +00:00
Heikki Linnakangas
3af6b3a2bf Avoid redownloading rust toolchain on Postgres changes (#12265)
Create a separate stage for downloading the Rust toolchain for pgrx, so
that it can be cached independently of the pg-build layer. Before this,
the 'pg-build-nonroot=with-cargo' layer was unnecessarily rebuilt every
time there was a change in PostgreSQL sources. Furthermore, this allows
using the same cached layer for building the compute images of all
Postgres versions.
2025-06-18 09:49:42 +00:00
Erik Grinaker
04013929cb pageserver: support full gRPC basebackups (#12269)
## Problem

Full basebackups are used in tests, and may be useful for debugging as
well, so we should support them in the gRPC API.

Touches #11728.

## Summary of changes

Add `GetBaseBackupRequest::full` to generate full base backups.

The libpq implementation also allows specifying `prev_lsn` for full
backups, i.e. the end LSN of the previous WAL record. This is omitted in
the gRPC API, since it's not used by any tests, and presumably of
limited value since it's autodetected. We can add it later if we find
that we need it.
2025-06-18 06:48:39 +00:00
Suhas Thalanki
83069f6ca1 fix: terminate pgbouncer on compute suspend (#12153)
## Problem

PgBouncer does not terminate connections on a suspend:
https://github.com/neondatabase/cloud/issues/16282

## Summary of changes

1. Adds a pid file to store the pid of PgBouncer
2. Terminates connections on a compute suspend

---------

Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
2025-06-17 22:56:05 +00:00
Mikhail
7d4f662fbf upgrade default neon version to 1.6 (#12185)
Changes for 1.6 were merged and deployed two months ago
https://github.com/neondatabase/neon/blob/main/pgxn/neon/neon--1.6--1.5.sql.
In order to deploy https://github.com/neondatabase/neon/pull/12183, we
need 1.6 to be default, otherwise we can't use prewarm API on read-only
replica (`ALTER EXTENSION` won't work) and we need it for promotion
2025-06-17 17:46:35 +00:00
Alexander Bayandin
a5cac52e26 compute-image: add a patch for onnxruntime (#12274)
## Problem

The checksum for eigen (a dependency for onnxruntime) has changed which
breaks compute image build.

## Summary of changes
- Add a patch for onnxruntime which backports changes from
f57db79743
(we keep the current version)

Ref https://github.com/microsoft/onnxruntime/issues/24861
2025-06-17 16:35:20 +00:00
Konstantin Knizhnik
dfa055f4be Support event trigger for Neon users (#10624)
## Problem

https://github.com/neondatabase/neon/issues/7570

Even triggers are supported only for superusers.

## Summary of changes

Temporary switch to superuser when even trigger is created and disable
execution of user's even triggers under superuser.

---------

Co-authored-by: Dimitri Fontaine <dim@tapoueh.org>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-17 15:44:50 +00:00
Erik Grinaker
a4c76740c0 pageserver: emit gRPC GetPage errors as responses (#12255)
## Problem

When converting `proto::GetPageRequest` into `page_api::GetPageRequest`
and validating the request, errors are returned as `tonic::Status`. This
will tear down the GetPage stream, which is disruptive and unnecessary.

## Summary of changes

Emit invalid request errors as `GetPageResponse` with an appropriate
`status_code` instead.

Also move the conversion from `tonic::Status` to `GetPageResponse` out
into the stream handler.
2025-06-17 15:41:17 +00:00
Dmitrii Kovalkov
f2e96b2323 tests: prepare test_compatibility.py for --timelines-onto-safekeepers (#12204)
## Problem
Compatibility tests may be run against a compatibility snapshot
generated with --timelines-onto-safekeepers=false. We need to start the
compute without a generation (or with 0 generation) if the timeline is
not storcon-managed, otherwise the compute will hang.

- Follow up on https://github.com/neondatabase/neon/pull/12203
- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Handle compatibility snapshot generated with no
`--timelines-onot-safekeepers` properly
2025-06-17 15:16:07 +00:00
Dmitrii Kovalkov
dee73f0cb4 pageserver: implement max_total_size_bytes limit for basebackup cache (#12230)
## Problem
The cache was introduced as a hackathon project and the only supported
limit was the number of entries.
The basebackup entry size may vary. We need to have more control over
disk space usage to ship it to production.

- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Store the size of entries in the cache and use it to limit
`max_total_size_bytes`
- Add the size of the cache in bytes to metrics.
2025-06-17 15:08:59 +00:00
Erik Grinaker
edf51688bc neon_local: support gRPC connstrings for endpoints (#12271)
## Problem

`neon_local` should support endpoints using gRPC, by providing `grpc://`
connstrings with the Pageservers' gRPC ports.

Requires #12268.
Touches #11926.

## Summary of changes

* Add `--grpc` switch for `neon_local endpoint create`.
* Generate `grpc://` connstrings for endpoints when enabled.

Computes don't actually support `grpc://` connstrings yet, but will
soon.

gRPC is configured when the endpoint is created, not when it's started,
such that it continues to use gRPC across restarts and reconfigurations.
In particular, this is necessary for the storage controller's local
notify hook, which can't easily plumb through gRPC configuration from
the start/reconfigure commands but has access to the endpoint's
configuration.
2025-06-17 14:39:42 +00:00
Aleksandr Sarantsev
4a8f3508f9 storcon: Add safekeeper request label group (#12239)
## Problem

The metrics `storage_controller_safekeeper_request_error` and
`storage_controller_safekeeper_request_latency` currently use
`pageserver_id` as a label.
This can be misleading, as the metrics are about safekeeper requests.  
We want to replace this with a more accurate label — either
`safekeeper_id` or `node_id`.

## Summary of changes

- Introduced `SafekeeperRequestLabelGroup` with `safekeeper_id`.
- Updated the affected metrics to use the new label group.
- Fixed incorrect metric usage in safekeeper_client.rs

## Follow-up

- Review usage of these metrics in alerting rules and existing Grafana
dashboards to ensure this change does not break something.
2025-06-17 13:33:01 +00:00
Erik Grinaker
48052477b4 storcon: register Pageserver gRPC address (#12268)
## Problem

Pageservers now expose a gRPC API on a separate address and port. This
must be registered with the storage controller such that it can be
plumbed through to the compute via cplane.

Touches #11926.

## Summary of changes

This patch registers the gRPC address and port with the storage
controller:

* Add gRPC address to `nodes` database table and `NodePersistence`, with
a Diesel migration.
* Add gRPC address in `NodeMetadata`, `NodeRegisterRequest`,
`NodeDescribeResponse`, and `TenantLocateResponseShard`.
* Add gRPC address flags to `storcon_cli node-register`.

These changes are backwards-compatible, since all structs will ignore
unknown fields during deserialization.
2025-06-17 13:27:10 +00:00
Erik Grinaker
d81353b2d1 pageserver: gRPC base backup fixes (#12243)
## Problem

The gRPC base backup implementation has a few issues: chunks are not
properly bounded, and it's not possible to omit the LSN.

Touches #11728.

## Summary of changes

* Properly bound chunks by using a limited writer.
* Use an `Option<Lsn>` rather than a `ReadLsn` (the latter requires an
LSN).
2025-06-17 12:37:43 +00:00
Aleksandr Sarantsev
143500dc4f storcon: Improve stably_attached readability (#12249)
## Problem

The `stably_attached` function is hard to read due to deeply nested
conditionals

## Summary of Changes

- Refactored `stably_attached` to use early returns and the `?` operator
for improved readability
2025-06-17 10:10:10 +00:00
Aleksandr Sarantsev
1a5f7ce6ad storcon: Exclude another secondaries while optimizing secondary (#12251)
## Problem

If the node intent includes more than one secondary, we can generate a
replace optimization using a candidate node that is already a secondary
location.

## Summary of changes

- Exclude all other secondary nodes from the scoring process to ensure
optimal candidate selection.
2025-06-17 10:09:55 +00:00
Alexander Lakhin
01ccb34118 Don't rerun failed tests in 'Build and Test with Sanitizers' workflow (#12259)
## Problem

We could easily miss a sanitizer-detected defect, if it occurred due to
some race condition, as we just rerun the test and if it succeeds, the
overall test run is considered successful. It was more reasonable
before, when we had much more unstable tests in main, but now we can
track all test failures.

## Summary of changes
Don't rerun failed tests.
2025-06-17 08:08:43 +00:00
Tristan Partin
f669e18477 Remove TODO comment related to default_transaction_read_only (#12261)
This code has been deployed for a while, so let's remove the TODO, and
remove the option passed from the control plane.

Link: https://github.com/neondatabase/cloud/pull/30274

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-16 19:38:26 +00:00
Suhas Thalanki
632cde7f13 schema and github workflow for validation of compute manifest (#12069)
Adds a schema to validate the manifest.yaml described in [this
RFC](https://github.com/neondatabase/neon/blob/main/docs/rfcs/038-independent-compute-release.md)
and a github workflow to test this.
2025-06-16 19:30:41 +00:00
Alexander Lakhin
118e13438d Add "Build and Test Fully" workflow (#11931)
## Problem

We don't test debug builds for v14..v16 in the regular "Build and Test"
runs to perform the testing faster, but it means we can't detect
assertion failures in those versions.
(See https://github.com/neondatabase/neon/issues/11891,
https://github.com/neondatabase/neon/issues/11997)

## Summary of changes
Add a new workflow to test all build types and all versions on all
architectures.
2025-06-16 13:29:39 +00:00
Trung Dinh
fc136eec8f pagectl: add dump layer local (#12245)
## Problem
In our environment, we don't always have access to the pagectl tool on
the pageserver. We have to download the page files to local env to
introspect them. Hence, it'll be useful to be able to parse the local
files using `pagectl`.

## Summary of changes
* Add `dump-layer-local` to `pagectl` that takes a local path as
argument and returns the layer content:
```
cargo  run -p pagectl layer dump-layer-local ~/Desktop/000000067F000040490002800000FFFFFFFF-030000000000000000000000000000000002__00003E7A53EDE611-00003E7AF27BFD19-v1-00000001
```

* Bonus: Fix a bug in `pageserver/ctl/src/draw_timeline_dir.rs` in which
we don't filter out temporary files.
2025-06-16 10:29:42 +00:00
Erik Grinaker
818e5130f1 page_api: add a few derives (#12253)
## Problem

The `page_api` domain types are missing a few derives.

## Summary of changes

Add `Clone`, `Copy`, and `Debug` derives for all types where
appropriate.
2025-06-16 09:45:50 +00:00
Alexander Sarantcev
c243521ae5 Fix reconcile_long_running metric comment (#12234)
## Problem

Comment for `storage_controller_reconcile_long_running` metric was
copy-pasted and not updated in #9207

## Summary of changes

- Fixed comment
2025-06-16 05:51:57 +00:00
Alexander Sarantcev
5303c71589 Move comment above metrics handler (#12236)
## Problem

Comment is in incorrect place: `/metrics` code is above its description
comment.

## Summary of changes

- `/metrics` code is now below the comment
2025-06-13 18:18:51 +00:00
Alexander Sarantcev
d146897415 Fix reconciles metrics typo (#12235)
## Problem

Need to fix naming `safkeeper` -> `safekeeper`

## Summary of changes

- `storage_controller_safkeeper_reconciles_*` renamed to
`storage_controller_safekeeper_reconciles_*`
2025-06-13 17:47:09 +00:00
Alexander Sarantcev
d63815fa40 Fix ChaosInjector shard eligibility bug (#12231)
## Problem

ChaosInjector is intended to skip non-active scheduling policies, but
the current logic skips active shards instead.

## Summary of changes

- Fixed shard eligibility condition to correctly allow chaos injection
for shards with an Active scheduling policy.
2025-06-13 13:34:29 +00:00
Dmitrii Kovalkov
385324ee8a pageserver: fix post-merge PR comments on basebackup cache (#12216)
## Problem
This PR addresses all but the direct IO post-merge comments on
basebackup cache implementation.

- Follow up on
https://github.com/neondatabase/neon/pull/11989#pullrequestreview-2867966119
- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Clean up the tmp directory by recreating it.
- Recreate the tmp directory on startup.
- Add comments why it's safe to not fsync the inode after renaming.
2025-06-13 08:49:31 +00:00
Alex Chi Z.
8a68d463f6 feat(pagectl): no max key limit if time travel recover locally (#12222)
## Problem

We would easily hit this limit for a tenant running for enough long
time.

## Summary of changes

Remove the max key limit for time-travel recovery if the command is
running locally.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-13 08:41:10 +00:00
Alex Chi Z.
3046c307da feat(posthog_client): support feature flag secure API (#12201)
## Problem

Part of #11813 

PostHog has two endpoints to retrieve feature flags: the old project ID
one that uses personal API token, and the new one using a special
feature flag secure token that can only retrieve feature flag. The new
API I added in this patch is not documented in the PostHog API doc but
it's used in their Python SDK.

## Summary of changes

Add support for "feature flag secure token API". The API has no way of
providing a project ID so we verify if the retrieved spec is consistent
with the project ID specified by comparing the `team_id` field.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-13 07:22:02 +00:00
Dmitrii Kovalkov
e83f1d8ba5 tests: prepare test_historic_storage_formats for --timelines-onto-safekeepers (#12214)
## Problem
`test_historic_storage_formats` uses `/tenant_import` to import historic
data. Tenant import does not create timelines onto safekeepers, because
they might already exist on some safekeeper set. If it does, then we may
end up with two different quorums accepting WAL for the same timeline.

If the tenant import is used in a real deployment, the administrator is
responsible for looking for the proper safekeeper set and migrate
timelines into storcon-managed timelines.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Create timelines onto safekeepers manually after tenant import in
`test_historic_storage_formats`
- Add a note to tenant import that timelines will be not storcon-managed
after the import.
2025-06-13 06:28:18 +00:00
Trung Dinh
8917676e86 Improve logging for gc-compaction (#12219)
## Problem
* Inside `compact_with_gc_inner`, there is a similar log line:

db24ba95d1/pageserver/src/tenant/timeline/compaction.rs (L3181-L3187)

* Also, I think it would be useful when debugging to have the ability to
select a particular sub-compaction job (e.g., `1/100`) to see all the
logs for that job.

## Summary of changes
* Attach a span to the `compact_with_gc_inner`. 

CC: @skyzh
2025-06-13 06:07:18 +00:00
Ivan Efremov
43acabd4c2 [proxy]: Improve backoff strategy for redis reconnection (#12218)
Sometimes during a failed redis connection attempt at the init stage
proxy pod can continuously restart. This, in turn, can aggravate the
problem if redis is overloaded.

Solves the #11114
2025-06-12 19:46:02 +00:00
Vlad Lazar
db24ba95d1 pagserver: always persist shard identity (#12217)
## Problem

The location config (which includes the stripe size) is stored on
pageserver disk.
For unsharded tenants we [do not include the shard identity in the
serialized
description](ad88ec9257/pageserver/src/tenant/config.rs (L64-L66)).
When the pageserver restarts, it reads that configuration and will use
the stripe size from there
and rely on storcon input from reattach for generation and mode.

The default deserialization is ShardIdentity::unsharded. This has the
new default stripe size of 2048.
Hence, for unsharded tenants we can be running with a stripe size
different from that the one in the
storcon observed state. This is not a problem until we shard split
without specifying a stripe size (i.e. manual splits via the UI or
storcon_cli). When that happens the new shards will use the 2048 stripe
size until storcon realises and switches them back. At that point it's
too late, since we've ingested data with the wrong stripe sizes.

## Summary of changes

Ideally, we would always have the full shard identity on disk. To
achieve this over two releases we do:
1. Always persist the shard identity in the location config on the PS.
2. Storage controller includes the stripe size to use in the re attach
response.

After the first release, we will start persisting correct stripe sizes
for any tenant shard that the storage controller
explicitly sends a location_conf. After the second release, the
re-attach change kicks in and we'll persist the
shard identity for all shards.
2025-06-12 17:15:02 +00:00
Folke Behrens
1dce65308d Update base64 to 0.22 (#12215)
## Problem

Base64 0.13 is outdated.

## Summary of changes

Update base64 to 0.22. Affects mostly proxy and proxy libs. Also upgrade
serde_with to remove another dep on base64 0.13 from dep tree.
2025-06-12 16:12:47 +00:00
Alex Chi Z.
ad88ec9257 fix(pageserver): extend layer manager read guard threshold (#12211)
## Problem

Follow up of https://github.com/neondatabase/neon/pull/12194 to make the
benchmarks run without warnings.

## Summary of changes

Extend read guard hold timeout to 30s.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-12 08:39:54 +00:00
Dmitrii Kovalkov
60dfdf39c7 tests: prepare test_tenant_delete_stale_shards for --timelines-onto-safekeepers (#12198)
## Problem
The test creates an endpoint and deletes its tenant. The compute cannot
stop gracefully because it tries to write a checkpoint shutdown record
into the WAL, but the timeline had been already deleted from
safekeepers.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
Stop the compute before deleting a tenant
2025-06-12 08:10:22 +00:00
Dmitrii Kovalkov
3d5e2bf685 storcon: add tenant_timeline_locate handler (#12203)
## Problem
Compatibility tests may be run against a compatibility snapshot
generated with `--timelines-onto-safekeepers=false`. We need to start
the compute without a generation (or with 0 generation) if the timeline
is not storcon-managed, otherwise the compute will hang.

This handler is needed to check if the timeline is storcon-managed.
It's also needed for better test coverage of safekeeper migration code.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Implement `tenant_timeline_locate` handler in storcon to get
safekeeper info from storcon's DB
2025-06-12 08:09:57 +00:00
dependabot[bot]
54fdcfdfa8 build(deps): bump requests from 2.32.3 to 2.32.4 in the pip group across 1 directory (#12180)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-11 21:09:05 +00:00
Vlad Lazar
28e882a80f pageserver: warn on long layer manager locking intervals (#12194)
## Problem

We hold the layer map for too long on occasion.

## Summary of changes

This should help us identify the places where it's happening from.

Related https://github.com/neondatabase/neon/issues/12182
2025-06-11 16:16:30 +00:00
Konstantin Knizhnik
24038033bf Remove default from DROP FUNCTION (#12202)
## Problem

DROP FUNCTION doesn't allow to specify default for parameters.

## Summary of changes

Remove DEFAULT clause from pgxn/neon/neon--1.6--1.5.sql

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-11 13:16:58 +00:00
Mikhail
1b935b1958 endpoint_storage: add ?from_endpoint= to /lfc/prewarm (#12195)
Related: https://github.com/neondatabase/cloud/issues/24225
Add optional from_endpoint parameter to allow prewarming from other
endpoint
2025-06-10 19:25:32 +00:00
333 changed files with 7918 additions and 3452 deletions

View File

@@ -38,6 +38,11 @@ on:
required: false
default: 1
type: number
rerun-failed:
description: 'rerun failed tests to ignore flaky tests'
required: false
default: true
type: boolean
defaults:
run:
@@ -99,11 +104,10 @@ jobs:
# Set some environment variables used by all the steps.
#
# CARGO_FLAGS is extra options to pass to "cargo build", "cargo test" etc.
# It also includes --features, if any
# CARGO_FLAGS is extra options to pass to all "cargo" subcommands.
#
# CARGO_FEATURES is passed to "cargo metadata". It is separate from CARGO_FLAGS,
# because "cargo metadata" doesn't accept --release or --debug options
# CARGO_PROFILE is passed to "cargo build", "cargo test" etc, but not to
# "cargo metadata", because it doesn't accept --release or --debug options.
#
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
@@ -112,16 +116,16 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FEATURES="--features testing"
CARGO_FLAGS="--locked --features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
CARGO_PROFILE=""
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked"
CARGO_PROFILE=""
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release"
CARGO_PROFILE="--release"
fi
if [[ $SANITIZERS == 'enabled' ]]; then
make_vars="WITH_SANITIZERS=yes"
@@ -131,8 +135,8 @@ jobs:
{
echo "cov_prefix=${cov_prefix}"
echo "make_vars=${make_vars}"
echo "CARGO_FEATURES=${CARGO_FEATURES}"
echo "CARGO_FLAGS=${CARGO_FLAGS}"
echo "CARGO_PROFILE=${CARGO_PROFILE}"
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
@@ -184,34 +188,18 @@ jobs:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
run: mold -run make ${make_vars} all -j$(nproc) CARGO_BUILD_FLAGS="$CARGO_FLAGS"
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
- name: Run cargo build
env:
WITH_TESTS: ${{ inputs.sanitizers != 'enabled' && '--tests' || '' }}
- name: Build unit tests
if: inputs.sanitizers != 'enabled'
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_PROFILE --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
@@ -223,7 +211,7 @@ jobs:
# Install target binaries
mkdir -p /tmp/neon/bin/
binaries=$(
${cov_prefix} cargo metadata $CARGO_FEATURES --format-version=1 --no-deps |
${cov_prefix} cargo metadata $CARGO_FLAGS --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
for bin in $binaries; do
@@ -240,7 +228,7 @@ jobs:
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES --message-format=json --no-run |
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_PROFILE --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
for bin in $test_exe_paths; do
@@ -274,10 +262,10 @@ jobs:
export LD_LIBRARY_PATH
#nextest does not yet support running doctests
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_PROFILE
# run all non-pageserver tests
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E '!package(pageserver)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E '!package(pageserver)'
# run pageserver tests
# (When developing new pageserver features gated by config fields, we commonly make the rust
@@ -286,13 +274,13 @@ jobs:
# pageserver tests from non-pageserver tests cuts down the time it takes for this CI step.)
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=tokio-epoll-uring \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(pageserver)'
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
@@ -301,17 +289,17 @@ jobs:
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: |
# Use tar to copy files matching the pattern, preserving the paths in the destionation
tar c \
pg_install/v* \
pg_install/build/*/src/test/regress/*.so \
pg_install/build/*/src/test/regress/pg_regress \
pg_install/build/*/src/test/isolation/isolationtester \
pg_install/build/*/src/test/isolation/pg_isolation_regress \
build/*/src/test/regress/*.so \
build/*/src/test/regress/pg_regress \
build/*/src/test/isolation/isolationtester \
build/*/src/test/isolation/pg_isolation_regress \
| tar x -C /tmp/neon
- name: Upload Neon artifact
@@ -379,7 +367,7 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
@@ -387,14 +375,14 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.test-run-count == 1 }}
rerun_failed: ${{ inputs.rerun-failed }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -110,7 +110,7 @@ jobs:
build-walproposer-lib:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_everything ||
contains(inputs.pg_versions, 'v17') || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
@@ -144,7 +144,7 @@ jobs:
id: cache_walproposer_lib
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: pg_install/build/walproposer-lib
path: build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Checkout submodule vendor/postgres-v17
@@ -169,11 +169,11 @@ jobs:
run:
make walproposer-lib -j$(sysctl -n hw.ncpu)
- name: Upload "pg_install/build/walproposer-lib" artifact
- name: Upload "build/walproposer-lib" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: pg_install--build--walproposer-lib
path: pg_install/build/walproposer-lib
name: build--walproposer-lib
path: build/walproposer-lib
# The artifact is supposed to be used by the next job in the same workflow,
# so theres no need to store it for too long.
retention-days: 1
@@ -226,11 +226,11 @@ jobs:
name: pg_install--v17
path: pg_install/v17
- name: Download "pg_install/build/walproposer-lib" artifact
- name: Download "build/walproposer-lib" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: pg_install--build--walproposer-lib
path: pg_install/build/walproposer-lib
name: build--walproposer-lib
path: build/walproposer-lib
# `actions/download-artifact` doesn't preserve permissions:
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss

View File

@@ -58,6 +58,7 @@ jobs:
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
rerun-failed: false
secrets: inherit
create-test-report:

View File

@@ -199,6 +199,28 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -648,7 +670,7 @@ jobs:
ghcr.io/neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-arm64
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -721,7 +743,6 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true
@@ -741,7 +762,6 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true

View File

@@ -0,0 +1,151 @@
name: Build and Test Fully
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 3 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ debug, release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v14", "lfc_state": "with-lfc"},
{"pg_version":"v15", "lfc_state": "with-lfc"},
{"pg_version":"v16", "lfc_state": "with-lfc"},
{"pg_version":"v17", "lfc_state": "with-lfc"},
{"pg_version":"v14", "lfc_state": "without-lfc"},
{"pg_version":"v15", "lfc_state": "without-lfc"},
{"pg_version":"v16", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "withouts-lfc"}]'
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -79,6 +79,7 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v17"}]'
sanitizers: enabled
secrets: inherit

View File

@@ -33,11 +33,19 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
# test only read-only custom scripts in new branch without database maintenance
- target: new_branch
custom_scripts: select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3
test_maintenance: false
# test all custom scripts in new branch with database maintenance
- target: new_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
test_maintenance: true
# test all custom scripts in reuse branch with database maintenance
- target: reuse_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
test_maintenance: true
max-parallel: 1 # we want to run each benchmark sequentially to not have noisy neighbors on shared storage (PS, SK)
permissions:
contents: write
statuses: write
@@ -145,6 +153,7 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark database maintenance
if: ${{ matrix.test_maintenance == 'true' }}
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}

175
.github/workflows/large_oltp_growth.yml vendored Normal file
View File

@@ -0,0 +1,175 @@
name: large oltp growth
# workflow to grow the reuse branch of large oltp benchmark continuously (about 16 GB per run)
on:
# uncomment to run on push for debugging your PR
# push:
# branches: [ bodobolero/increase_large_oltp_workload ]
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 6 * * *' # 06:00 UTC
- cron: '0 8 * * *' # 08:00 UTC
- cron: '0 10 * * *' # 10:00 UTC
- cron: '0 12 * * *' # 12:00 UTC
- cron: '0 14 * * *' # 14:00 UTC
- cron: '0 16 * * *' # 16:00 UTC
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we need dedicated resources which only exist once
group: large-oltp-growth
cancel-in-progress: true
permissions:
contents: read
jobs:
oltp:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
# for now only grow the reuse branch, not the other branches.
- target: reuse_branch
custom_scripts:
- grow_action_blocks.sql
- grow_action_kwargs.sql
- grow_device_fingerprint_event.sql
- grow_edges.sql
- grow_hotel_rate_mapping.sql
- grow_ocr_pipeline_results_version.sql
- grow_priceline_raw_response.sql
- grow_relabled_transactions.sql
- grow_state_values.sql
- grow_values.sql
- grow_vertices.sql
- update_accounting_coding_body_tracking_category_selection.sql
- update_action_blocks.sql
- update_action_kwargs.sql
- update_denormalized_approval_workflow.sql
- update_device_fingerprint_event.sql
- update_edges.sql
- update_heron_transaction_enriched_log.sql
- update_heron_transaction_enrichment_requests.sql
- update_hotel_rate_mapping.sql
- update_incoming_webhooks.sql
- update_manual_transaction.sql
- update_ml_receipt_matching_log.sql
- update_ocr_pipeine_results_version.sql
- update_orc_pipeline_step_results.sql
- update_orc_pipeline_step_results_version.sql
- update_priceline_raw_response.sql
- update_quickbooks_transactions.sql
- update_raw_finicity_transaction.sql
- update_relabeled_transactions.sql
- update_state_values.sql
- update_stripe_authorization_event_log.sql
- update_transaction.sql
- update_values.sql
- update_vertices.sql
max-parallel: 1 # we want to run each growth workload sequentially (for now there is just one)
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "1h"
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ join(matrix.custom_scripts, ' ') }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: 16 # pre-determined by pre-determined project
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
PLATFORM: ${{ matrix.target }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
run: |
case "${{ matrix.target }}" in
reuse_branch)
CONNSTR=${{ secrets.BENCHMARK_LARGE_OLTP_REUSE_CONNSTR }}
;;
*)
echo >&2 "Unknown target=${{ matrix.target }}"
exit 1
;;
esac
CONNSTR_WITHOUT_POOLER="${CONNSTR//-pooler/}"
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
echo "connstr_without_pooler=${CONNSTR_WITHOUT_POOLER}" >> $GITHUB_OUTPUT
- name: pgbench with custom-scripts
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: true
extra_params: -m remote_cluster --timeout 7200 -k test_perf_oltp_large_tenant_growth
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Periodic large oltp tenant growth increase: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

83
.github/workflows/proxy-benchmark.yml vendored Normal file
View File

@@ -0,0 +1,83 @@
name: Periodic proxy performance test on unit-perf hetzner runner
on:
push: # TODO: remove after testing
branches:
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
# schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
# - cron: '0 5 * * *' # Runs at 5 UTC once a day
workflow_dispatch: # adds an ability to run this manually
defaults:
run:
shell: bash -euo pipefail {0}
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
permissions:
contents: read
jobs:
run_periodic_proxybench_test:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
runs-on: [self-hosted, unit-perf]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Checkout proxy-bench Repo
uses: actions/checkout@v4
with:
repository: neondatabase/proxy-bench
path: proxy-bench
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
id: set-env
shell: bash -euxo pipefail {0}
run: |
PROXY_BENCH_PATH=$(realpath ./proxy-bench)
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo ""
} >> "$GITHUB_ENV"
- name: Run proxy-bench
run: ./${PROXY_BENCH_PATH}/run.sh
- name: Ingest Bench Results # neon repo script
if: success()
run: |
mkdir -p $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
if: success()
env:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
REPORT_FROM: $TEST_OUTPUT
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
- name: Docker cleanup
run: docker compose down
- name: Notify Failure
if: failure()
run: echo "Proxy bench job failed" && exit 1

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
/artifact_cache
/build
/pg_install
/target
/tmp_check

64
Cargo.lock generated
View File

@@ -753,6 +753,7 @@ dependencies = [
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
@@ -761,6 +762,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"serde_html_form",
"serde_path_to_error",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -900,12 +903,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.7"
@@ -1297,7 +1294,7 @@ dependencies = [
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.13.1",
"base64 0.22.1",
"bytes",
"camino",
"cfg-if",
@@ -1423,7 +1420,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.13.1",
"base64 0.22.1",
"camino",
"clap",
"comfy-table",
@@ -4258,6 +4255,7 @@ dependencies = [
"tokio-util",
"tonic 0.13.1",
"tracing",
"url",
"utils",
"workspace_hack",
]
@@ -4337,6 +4335,7 @@ dependencies = [
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"postgres_ffi_types",
"postgres_initdb",
"posthog_client_lite",
"pprof",
@@ -4406,7 +4405,7 @@ dependencies = [
"nix 0.30.1",
"once_cell",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"rand 0.8.5",
"remote_storage",
"reqwest",
@@ -4468,11 +4467,16 @@ dependencies = [
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"futures",
"pageserver_api",
"postgres_ffi",
"prost 0.13.5",
"strum",
"strum_macros",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tonic-build",
"utils",
@@ -4815,7 +4819,7 @@ dependencies = [
name = "postgres-protocol2"
version = "0.1.0"
dependencies = [
"base64 0.20.0",
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
@@ -4892,6 +4896,7 @@ dependencies = [
"memoffset 0.9.0",
"once_cell",
"postgres",
"postgres_ffi_types",
"pprof",
"regex",
"serde",
@@ -4900,6 +4905,14 @@ dependencies = [
"utils",
]
[[package]]
name = "postgres_ffi_types"
version = "0.1.0"
dependencies = [
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "postgres_initdb"
version = "0.1.0"
@@ -5187,7 +5200,7 @@ dependencies = [
"aws-config",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"base64 0.22.1",
"bstr",
"bytes",
"camino",
@@ -6422,6 +6435,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "serde_html_form"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.125"
@@ -6478,15 +6504,17 @@ dependencies = [
[[package]]
name = "serde_with"
version = "2.3.3"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
dependencies = [
"base64 0.13.1",
"base64 0.22.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"serde",
"serde_derive",
"serde_json",
"serde_with_macros",
"time",
@@ -6494,9 +6522,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "2.3.3"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
dependencies = [
"darling",
"proc-macro2",
@@ -7546,6 +7574,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
@@ -7565,6 +7594,7 @@ dependencies = [
"tower-layer",
"tower-service",
"tracing",
"zstd",
]
[[package]]
@@ -8144,6 +8174,7 @@ dependencies = [
"futures",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"pprof",
"prost 0.13.5",
"remote_storage",
@@ -8567,7 +8598,6 @@ dependencies = [
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.7",
"base64ct",
"bytes",

View File

@@ -22,6 +22,7 @@ members = [
"libs/http-utils",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
@@ -71,8 +72,8 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
base64 = "0.13.0"
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.22"
bincode = "1.3"
bindgen = "0.71"
bit_field = "0.10.2"
@@ -171,7 +172,7 @@ sentry = { version = "0.37", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "2.0", features = [ "base64" ] }
serde_with = { version = "3", features = [ "base64" ] }
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"
@@ -199,7 +200,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
@@ -259,6 +260,7 @@ pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }

View File

@@ -5,8 +5,6 @@
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG DEFAULT_PG_VERSION=17
ARG STABLE_PG_VERSION=16
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -47,7 +45,6 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE=release
RUN set -e \
&& mold -run make -j $(nproc) -s neon-pg-ext \
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
# Prepare cargo-chef recipe
@@ -63,14 +60,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG STABLE_PG_VERSION
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --from=plan /home/nonroot/recipe.json recipe.json
ARG ADDITIONAL_RUSTFLAGS=""
@@ -97,7 +91,6 @@ RUN set -e \
# Build final image
#
FROM $BASE_IMAGE_SHA
ARG DEFAULT_PG_VERSION
WORKDIR /data
RUN set -e \
@@ -107,8 +100,6 @@ RUN set -e \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
unzip \
curl \

157
Makefile
View File

@@ -1,8 +1,18 @@
ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
# Where to install Postgres, default is ./pg_install, maybe useful for package
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# CARGO_BUILD_FLAGS: Extra flags to pass to `cargo build`. `--locked`
# and `--features testing` are popular examples.
#
# CARGO_PROFILE: You can also set to override the cargo profile to
# use. By default, it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := build
ICU_PREFIX_DIR := /usr/local/icu
#
@@ -16,12 +26,12 @@ ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS += -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
CARGO_PROFILE ?= --profile=release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
CARGO_PROFILE ?= --profile=dev
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
@@ -93,7 +103,7 @@ all: neon postgres neon-pg-ext
.PHONY: neon
neon: postgres-headers walproposer-lib cargo-target-dir
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE)
.PHONY: cargo-target-dir
cargo-target-dir:
# https://github.com/rust-lang/cargo/issues/14281
@@ -104,21 +114,20 @@ cargo-target-dir:
# Some rules are duplicated for Postgres v14 and 15. We may want to refactor
# to avoid the duplication in the future, but it's tolerable for now.
#
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
mkdir -p $(POSTGRES_INSTALL_DIR)
test -e $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG
$(BUILD_DIR)/%/config.status:
mkdir -p $(BUILD_DIR)
test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
mkdir -p $(BUILD_DIR)/$*
VERSION=$*; \
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \
(cd $(BUILD_DIR)/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
@@ -130,96 +139,54 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
# the "build-all-versions" entry points) where direct mention of PostgreSQL
# versions is used.
.PHONY: postgres-configure-v17
postgres-configure-v17: $(POSTGRES_INSTALL_DIR)/build/v17/config.status
postgres-configure-v17: $(BUILD_DIR)/v17/config.status
.PHONY: postgres-configure-v16
postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status
postgres-configure-v16: $(BUILD_DIR)/v16/config.status
.PHONY: postgres-configure-v15
postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
postgres-configure-v15: $(BUILD_DIR)/v15/config.status
.PHONY: postgres-configure-v14
postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
postgres-configure-v14: $(BUILD_DIR)/v14/config.status
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
.PHONY: postgres-headers-%
postgres-headers-%: postgres-configure-%
+@echo "Installing PostgreSQL $* headers"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install
$(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install
# Compile and install PostgreSQL
.PHONY: postgres-%
postgres-%: postgres-configure-% \
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
+@echo "Compiling libpq $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install
$(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install
+@echo "Compiling pg_prewarm $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install
+@echo "Compiling pg_visibility $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_visibility install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install
+@echo "Compiling pageinspect $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install
+@echo "Compiling pg_trgm $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_trgm install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install
+@echo "Compiling amcheck $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install
.PHONY: postgres-clean-%
postgres-clean-%:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
$(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install
.PHONY: postgres-check-%
postgres-check-%: postgres-%
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-%
+@echo "Compiling neon $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
+@echo "Compiling neon-specific Postgres extensions for $*"
mkdir -p $(BUILD_DIR)/pgxn-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
+@echo "Compiling neon_walredo $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
+@echo "Compiling neon_rmgr $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install
+@echo "Compiling neon_test_utils $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
+@echo "Compiling neon_utils $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
.PHONY: neon-pg-clean-ext-%
neon-pg-clean-ext-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
-C $(BUILD_DIR)/pgxn-$*\
-f $(ROOT_PROJECT_DIR)/pgxn/Makefile install
# Build walproposer as a static library. walproposer source code is located
# in the pgxn/neon directory.
@@ -233,15 +200,15 @@ neon-pg-clean-ext-%:
.PHONY: walproposer-lib
walproposer-lib: neon-pg-ext-v17
+@echo "Compiling walproposer-lib"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
mkdir -p $(BUILD_DIR)/walproposer-lib
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-C $(BUILD_DIR)/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(BUILD_DIR)/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(BUILD_DIR)/walproposer-lib
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgport.a \
pg_strong_random.o
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \
checksum_helper.o \
cryptohash_openssl.o \
hmac_openssl.o \
@@ -249,16 +216,10 @@ walproposer-lib: neon-pg-ext-v17
parse_manifest.o \
scram-common.o
ifeq ($(UNAME_S),Linux)
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \
pg_crc32c.o
endif
.PHONY: walproposer-lib-clean
walproposer-lib-clean:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \
neon-pg-ext-v14 \
@@ -266,13 +227,6 @@ neon-pg-ext: \
neon-pg-ext-v16 \
neon-pg-ext-v17
.PHONY: neon-pg-clean-ext
neon-pg-clean-ext: \
neon-pg-clean-ext-v14 \
neon-pg-clean-ext-v15 \
neon-pg-clean-ext-v16 \
neon-pg-clean-ext-v17
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
@@ -288,13 +242,6 @@ postgres-headers: \
postgres-headers-v16 \
postgres-headers-v17
.PHONY: postgres-clean
postgres-clean: \
postgres-clean-v14 \
postgres-clean-v15 \
postgres-clean-v16 \
postgres-clean-v17
.PHONY: postgres-check
postgres-check: \
postgres-check-v14 \
@@ -302,12 +249,6 @@ postgres-check: \
postgres-check-v16 \
postgres-check-v17
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean: postgres-clean neon-pg-clean-ext
$(MAKE) -C compute clean
$(CARGO_CMD_PREFIX) cargo clean
# This removes everything
.PHONY: distclean
distclean:
@@ -320,7 +261,7 @@ fmt:
postgres-%-pg-bsd-indent: postgres-%
+@echo "Compiling pg_bsd_indent"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/
$(MAKE) -C $(BUILD_DIR)/$*/src/tools/pg_bsd_indent/
# Create typedef list for the core. Note that generally it should be combined with
# buildfarm one to cover platform specific stuff.
@@ -339,7 +280,7 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
cat $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/typedefs.list |\
cat - postgres-$*-typedefs.list | sort | uniq > postgres-$*-typedefs-full.list
+@echo note: you might want to run it on selected files/dirs instead.
INDENT=$(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/pg_bsd_indent \
INDENT=$(BUILD_DIR)/$*/src/tools/pg_bsd_indent/pg_bsd_indent \
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/pgindent --typedefs postgres-$*-typedefs-full.list \
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/ \
--excludes $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/exclude_file_patterns
@@ -350,9 +291,9 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \
INDENT=$(POSTGRES_INSTALL_DIR)/build/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \
-C $(POSTGRES_INSTALL_DIR)/build/neon-v17 \
-C $(BUILD_DIR)/neon-v17 \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent

3
compute/.gitignore vendored
View File

@@ -3,3 +3,6 @@ etc/neon_collector.yml
etc/neon_collector_autoscaling.yml
etc/sql_exporter.yml
etc/sql_exporter_autoscaling.yml
# Node.js dependencies
node_modules/

View File

@@ -48,3 +48,11 @@ jsonnetfmt-test:
.PHONY: jsonnetfmt-format
jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json
npm install
touch node_modules

View File

@@ -77,9 +77,6 @@
# build_and_test.yml github workflow for how that's done.
ARG PG_VERSION
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -149,8 +146,11 @@ RUN case $DEBIAN_VERSION in \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/*
&& apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
#########################################################################################
#
@@ -1057,17 +1057,10 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
#########################################################################################
#
# Layer "pg build with nonroot user and cargo installed"
# This layer is base and common for layers with `pgrx`
# Layer "build-deps with Rust toolchain installed"
#
#########################################################################################
FROM pg-build AS pg-build-nonroot-with-cargo
ARG PG_VERSION
RUN apt update && \
apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \
apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
FROM build-deps AS build-deps-with-cargo
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
@@ -1082,13 +1075,29 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init
#########################################################################################
#
# Layer "pg-build with Rust toolchain installed"
# This layer is base and common for layers with `pgrx`
#
#########################################################################################
FROM pg-build AS pg-build-with-cargo
ARG PG_VERSION
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
COPY --from=build-deps-with-cargo /home/nonroot /home/nonroot
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgrx` deps
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build
FROM pg-build-with-cargo AS rust-extensions-build
ARG PG_VERSION
RUN case "${PG_VERSION:?}" in \
@@ -1110,7 +1119,7 @@ USER root
# and eventually get merged with `rust-extensions-build`
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx12
FROM pg-build-with-cargo AS rust-extensions-build-pgrx12
ARG PG_VERSION
RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
@@ -1127,7 +1136,7 @@ USER root
# and eventually get merged with `rust-extensions-build`
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
FROM pg-build-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
@@ -1144,10 +1153,12 @@ USER root
FROM build-deps AS pgrag-src
ARG PG_VERSION
WORKDIR /ext-src
COPY compute/patches/onnxruntime.patch .
RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.gz -O onnxruntime.tar.gz && \
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
patch -p1 < /ext-src/onnxruntime.patch && \
echo "#nothing to test here" > neon-test.sh
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.2.tar.gz -O pgrag.tar.gz && \
@@ -1621,18 +1632,7 @@ FROM pg-build AS neon-ext-build
ARG PG_VERSION
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_test_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_rmgr \
-s install
RUN make -j $(getconf _NPROCESSORS_ONLN) -C pgxn -s install-compute
#########################################################################################
#
@@ -1722,7 +1722,7 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
FROM build-deps-with-cargo AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
@@ -1732,7 +1732,7 @@ COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
mkdir target-bin && \
cp target/release-line-debug-size-lto/compute_ctl \
target/release-line-debug-size-lto/fast_import \
@@ -1826,10 +1826,11 @@ RUN rm /usr/local/pgsql/lib/lib*.a
# Preprocess the sql_exporter configuration files
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS sql_exporter_preprocessor
FROM build-deps AS sql_exporter_preprocessor
ARG PG_VERSION
USER nonroot
WORKDIR /home/nonroot
COPY --chown=nonroot compute compute

View File

@@ -21,6 +21,8 @@ unix_socket_dir=/tmp/
unix_socket_mode=0777
; required for pgbouncer_exporter
ignore_startup_parameters=extra_float_digits
; pidfile for graceful termination
pidfile=/tmp/pgbouncer.pid
;; Disable connection logging. It produces a lot of logs that no one looks at,
;; and we can get similar log entries from the proxy too. We had incidents in

View File

@@ -0,0 +1,209 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Neon Compute Manifest Schema",
"description": "Schema for Neon compute node configuration manifest",
"type": "object",
"properties": {
"pg_settings": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"client_connection_check_interval": {
"type": "string",
"description": "Check for client disconnection interval in milliseconds"
},
"effective_io_concurrency": {
"type": "string",
"description": "Effective IO concurrency setting"
},
"fsync": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to force fsync to disk"
},
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled"
},
"idle_in_transaction_session_timeout": {
"type": "string",
"description": "Timeout for idle transactions in milliseconds"
},
"listen_addresses": {
"type": "string",
"description": "Addresses to listen on"
},
"log_connections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log connections"
},
"log_disconnections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log disconnections"
},
"log_temp_files": {
"type": "string",
"description": "Size threshold for logging temporary files in KB"
},
"log_error_verbosity": {
"type": "string",
"enum": ["terse", "verbose", "default"],
"description": "Error logging verbosity level"
},
"log_min_error_statement": {
"type": "string",
"description": "Minimum error level for statement logging"
},
"maintenance_io_concurrency": {
"type": "string",
"description": "Maintenance IO concurrency setting"
},
"max_connections": {
"type": "string",
"description": "Maximum number of connections"
},
"max_replication_flush_lag": {
"type": "string",
"description": "Maximum replication flush lag"
},
"max_replication_slots": {
"type": "string",
"description": "Maximum number of replication slots"
},
"max_replication_write_lag": {
"type": "string",
"description": "Maximum replication write lag"
},
"max_wal_senders": {
"type": "string",
"description": "Maximum number of WAL senders"
},
"max_wal_size": {
"type": "string",
"description": "Maximum WAL size"
},
"neon.unstable_extensions": {
"type": "string",
"description": "List of unstable extensions"
},
"neon.protocol_version": {
"type": "string",
"description": "Neon protocol version"
},
"password_encryption": {
"type": "string",
"description": "Password encryption method"
},
"restart_after_crash": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to restart after crash"
},
"superuser_reserved_connections": {
"type": "string",
"description": "Number of reserved connections for superuser"
},
"synchronous_standby_names": {
"type": "string",
"description": "Names of synchronous standby servers"
},
"wal_keep_size": {
"type": "string",
"description": "WAL keep size"
},
"wal_level": {
"type": "string",
"description": "WAL level"
},
"wal_log_hints": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log hints in WAL"
},
"wal_sender_timeout": {
"type": "string",
"description": "WAL sender timeout in milliseconds"
}
},
"required": [
"client_connection_check_interval",
"effective_io_concurrency",
"fsync",
"hot_standby",
"idle_in_transaction_session_timeout",
"listen_addresses",
"log_connections",
"log_disconnections",
"log_temp_files",
"log_error_verbosity",
"log_min_error_statement",
"maintenance_io_concurrency",
"max_connections",
"max_replication_flush_lag",
"max_replication_slots",
"max_replication_write_lag",
"max_wal_senders",
"max_wal_size",
"neon.unstable_extensions",
"neon.protocol_version",
"password_encryption",
"restart_after_crash",
"superuser_reserved_connections",
"synchronous_standby_names",
"wal_keep_size",
"wal_level",
"wal_log_hints",
"wal_sender_timeout"
]
},
"replica": {
"type": "object",
"properties": {
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled for replicas"
}
},
"required": ["hot_standby"]
},
"per_version": {
"type": "object",
"patternProperties": {
"^1[4-7]$": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"io_combine_limit": {
"type": "string",
"description": "IO combine limit"
}
}
},
"replica": {
"type": "object",
"properties": {
"recovery_prefetch": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to enable recovery prefetch for PostgreSQL replicas"
}
}
}
}
}
}
}
},
"required": ["common", "replica", "per_version"]
}
},
"required": ["pg_settings"]
}

View File

@@ -105,17 +105,17 @@ pg_settings:
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
16:
common:
common: {}
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
15:
common:
common: {}
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
14:
common:
replica:
common: {}
replica: {}

37
compute/package-lock.json generated Normal file
View File

@@ -0,0 +1,37 @@
{
"name": "neon-compute",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "neon-compute",
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
},
"node_modules/@sourcemeta/jsonschema": {
"version": "9.3.4",
"resolved": "https://registry.npmjs.org/@sourcemeta/jsonschema/-/jsonschema-9.3.4.tgz",
"integrity": "sha512-hkujfkZAIGXUs4U//We9faZW8LZ4/H9LqagRYsFSulH/VLcKPNhZyCTGg7AhORuzm27zqENvKpnX4g2FzudYFw==",
"cpu": [
"x64",
"arm64"
],
"license": "AGPL-3.0",
"os": [
"darwin",
"linux",
"win32"
],
"bin": {
"jsonschema": "cli.js"
},
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sourcemeta"
}
}
}
}

7
compute/package.json Normal file
View File

@@ -0,0 +1,7 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -0,0 +1,15 @@
diff --git a/cmake/deps.txt b/cmake/deps.txt
index d213b09034..229de2ebf0 100644
--- a/cmake/deps.txt
+++ b/cmake/deps.txt
@@ -22,7 +22,9 @@ dlpack;https://github.com/dmlc/dlpack/archive/refs/tags/v0.6.zip;4d565dd2e5b3132
# it contains changes on top of 3.4.0 which are required to fix build issues.
# Until the 3.4.1 release this is the best option we have.
# Issue link: https://gitlab.com/libeigen/eigen/-/issues/2744
-eigen;https://gitlab.com/libeigen/eigen/-/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;be8be39fdbc6e60e94fa7870b280707069b5b81a
+# Moved to github mirror to avoid gitlab issues.Add commentMore actions
+# Issue link: https://github.com/bazelbuild/bazel-central-registry/issues/4355
+eigen;https://github.com/eigen-mirror/eigen/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;61418a349000ba7744a3ad03cf5071f22ebf860a
flatbuffers;https://github.com/google/flatbuffers/archive/refs/tags/v23.5.26.zip;59422c3b5e573dd192fead2834d25951f1c1670c
fp16;https://github.com/Maratyszcza/FP16/archive/0a92994d729ff76a58f692d3028ca1b64b145d91.zip;b985f6985a05a1c03ff1bb71190f66d8f98a1494
fxdiv;https://github.com/Maratyszcza/FXdiv/archive/63058eff77e11aa15bf531df5dd34395ec3017c8.zip;a5658f4036402dbca7cebee32be57fb8149811e1

View File

@@ -124,6 +124,10 @@ struct Cli {
/// Interval in seconds for collecting installed extensions statistics
#[arg(long, default_value = "3600")]
pub installed_extensions_collection_interval: u64,
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
}
impl Cli {
@@ -159,7 +163,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init())?;
runtime.block_on(init(cli.dev))?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -198,13 +202,13 @@ fn main() -> Result<()> {
deinit_and_exit(exit_code);
}
async fn init() -> Result<()> {
async fn init(dev_mode: bool) -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {
handle_exit_signal(sig);
handle_exit_signal(sig, dev_mode);
}
});
@@ -263,9 +267,9 @@ fn deinit_and_exit(exit_code: Option<i32>) -> ! {
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
fn handle_exit_signal(sig: i32, dev_mode: bool) {
info!("received {sig} termination signal");
forward_termination_signal();
forward_termination_signal(dev_mode);
exit(1);
}

View File

@@ -486,10 +486,8 @@ async fn cmd_pgdata(
};
let superuser = "cloud_admin";
let destination_connstring = format!(
"host=localhost port={} user={} dbname=neondb",
pg_port, superuser
);
let destination_connstring =
format!("host=localhost port={pg_port} user={superuser} dbname=neondb");
let pgdata_dir = workdir.join("pgdata");
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());

View File

@@ -69,7 +69,7 @@ impl clap::builder::TypedValueParser for S3Uri {
S3Uri::from_str(value_str).map_err(|e| {
clap::Error::raw(
clap::error::ErrorKind::InvalidValue,
format!("Failed to parse S3 URI: {}", e),
format!("Failed to parse S3 URI: {e}"),
)
})
}

View File

@@ -22,7 +22,7 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -119,7 +119,7 @@ pub async fn get_database_schema(
_ => {
let mut lines = stderr_reader.lines();
if let Some(line) = lines.next_line().await? {
if line.contains(&format!("FATAL: database \"{}\" does not exist", dbname)) {
if line.contains(&format!("FATAL: database \"{dbname}\" does not exist")) {
return Err(SchemaDumpError::DatabaseDoesNotExist);
}
warn!("pg_dump stderr: {}", line)

View File

@@ -35,6 +35,7 @@ use url::Url;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use utils::pid_file;
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
@@ -44,6 +45,7 @@ use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::pgbouncer::*;
use crate::rsyslog::{
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
launch_pgaudit_gc,
@@ -161,6 +163,10 @@ pub struct ComputeState {
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
/// mode == ComputeMode::Primary. None otherwise
pub terminate_flush_lsn: Option<Lsn>,
pub metrics: ComputeMetrics,
}
@@ -176,6 +182,7 @@ impl ComputeState {
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
}
}
@@ -215,6 +222,45 @@ pub struct ParsedSpec {
pub endpoint_storage_token: Option<String>,
}
impl ParsedSpec {
pub fn validate(&self) -> Result<(), String> {
// Only Primary nodes are using safekeeper_connstrings, and at the moment
// this method only validates that part of the specs.
if self.spec.mode != ComputeMode::Primary {
return Ok(());
}
// While it seems like a good idea to check for an odd number of entries in
// the safekeepers connection string, changes to the list of safekeepers might
// incur appending a new server to a list of 3, in which case a list of 4
// entries is okay in production.
//
// Still we want unique entries, and at least one entry in the vector
if self.safekeeper_connstrings.is_empty() {
return Err(String::from("safekeeper_connstrings is empty"));
}
// check for uniqueness of the connection strings in the set
let mut connstrings = self.safekeeper_connstrings.clone();
connstrings.sort();
let mut previous = &connstrings[0];
for current in connstrings.iter().skip(1) {
// duplicate entry?
if current == previous {
return Err(format!(
"duplicate entry in safekeeper_connstrings: {current}!",
));
}
previous = current;
}
Ok(())
}
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
@@ -244,6 +290,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
} else {
spec.safekeeper_connstrings.clone()
};
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
@@ -278,7 +325,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
Ok(ParsedSpec {
let res = ParsedSpec {
spec,
pageserver_connstr,
safekeeper_connstrings,
@@ -287,7 +334,11 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
})
};
// Now check validity of the parsed specification
res.validate()?;
Ok(res)
}
}
@@ -354,14 +405,11 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
// Allow the control plane to override any options set by the
// compute
Some(options) => format!("{EXTRA_OPTIONS} {options}"),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
@@ -489,12 +537,21 @@ impl ComputeNode {
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
// /terminate returns LSN. If we don't sleep at all, connection will break and we
// won't get result. If we sleep too much, tests will take significantly longer
// and Github Action run will error out
let sleep_duration = if delay_exit {
Duration::from_secs(30)
} else {
Duration::from_millis(300)
};
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
std::thread::sleep(Duration::from_secs(30));
}
std::thread::sleep(sleep_duration);
Ok(exit_code)
}
@@ -785,7 +842,7 @@ impl ComputeNode {
self.spawn_extension_stats_task();
if pspec.spec.autoprewarm {
self.prewarm_lfc();
self.prewarm_lfc(None);
}
Ok(())
}
@@ -866,20 +923,25 @@ impl ComputeNode {
// Maybe sync safekeepers again, to speed up next startup
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
let lsn = if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = self.sync_safekeepers(storage_auth_token)?;
info!("synced safekeepers at lsn {lsn}");
}
info!(%lsn, "synced safekeepers");
Some(lsn)
} else {
info!("not primary, not syncing safekeepers");
None
};
let mut delay_exit = false;
let mut state = self.state.lock().unwrap();
if state.status == ComputeStatus::TerminationPending {
state.terminate_flush_lsn = lsn;
if let ComputeStatus::TerminationPending { mode } = state.status {
state.status = ComputeStatus::Terminated;
self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = true
delay_exit = mode == compute_api::responses::TerminateMode::Fast
}
drop(state);
@@ -1064,7 +1126,7 @@ impl ComputeNode {
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
// Format connstr
let id = connstr.clone();
let connstr = format!("postgresql://no_user@{}", connstr);
let connstr = format!("postgresql://no_user@{connstr}");
let options = format!(
"-c timeline_id={} tenant_id={}",
pspec.timeline_id, pspec.tenant_id
@@ -1427,7 +1489,7 @@ impl ComputeNode {
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -1570,7 +1632,7 @@ impl ComputeNode {
Ok((mut client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(&mut client).await {
@@ -1750,7 +1812,7 @@ impl ComputeNode {
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPending
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => break 'cert_update,
// wait
@@ -1874,7 +1936,7 @@ impl ComputeNode {
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
let result = client
@@ -2043,7 +2105,7 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(())
@@ -2070,7 +2132,7 @@ LIMIT 100",
let version: Option<ExtVersion> = db_client
.query_opt(version_query, &[&ext_name])
.await
.with_context(|| format!("Failed to execute query: {}", version_query))?
.with_context(|| format!("Failed to execute query: {version_query}"))?
.map(|row| row.get(0));
// sanitize the inputs as postgres idents.
@@ -2085,14 +2147,14 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(ext_version)
@@ -2251,12 +2313,68 @@ pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
Ok(())
}
pub fn forward_termination_signal() {
pub fn forward_termination_signal(dev_mode: bool) {
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
if !dev_mode {
// Terminate pgbouncer with SIGKILL
match pid_file::read(PGBOUNCER_PIDFILE.into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGKILL to pgbouncer process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGKILL) {
error!("failed to terminate pgbouncer: {}", e);
}
}
// pgbouncer does not lock the pid file, so we read and kill the process directly
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
if let Ok(pid_str) = std::fs::read_to_string(PGBOUNCER_PIDFILE) {
if let Ok(pid) = pid_str.trim().parse::<i32>() {
info!(
"sending SIGKILL to pgbouncer process pid: {} (from unlocked pid file)",
pid
);
if let Err(e) = kill(Pid::from_raw(pid), Signal::SIGKILL) {
error!("failed to terminate pgbouncer: {}", e);
}
}
} else {
info!("pgbouncer pid file exists but process not running");
}
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("pgbouncer pid file not found, process may not be running");
}
Err(e) => {
error!("error reading pgbouncer pid file: {}", e);
}
}
// Terminate local_proxy
match pid_file::read("/etc/local_proxy/pid".into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGTERM to local_proxy process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGTERM) {
error!("failed to terminate local_proxy: {}", e);
}
}
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
info!("local_proxy PID file exists but process not running");
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("local_proxy PID file not found, process may not be running");
}
Err(e) => {
error!("error reading local_proxy PID file: {}", e);
}
}
} else {
info!("Skipping pgbouncer and local_proxy termination because in dev mode");
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
@@ -2289,3 +2407,21 @@ impl<T: 'static> JoinSetExt<T> for tokio::task::JoinSet<T> {
})
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use super::*;
#[test]
fn duplicate_safekeeper_connstring() {
let file = File::open("tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
match ParsedSpec::try_from(spec.clone()) {
Ok(_p) => panic!("Failed to detect duplicate entry"),
Err(e) => assert!(e.starts_with("duplicate entry in safekeeper_connstrings:")),
};
}
}

View File

@@ -25,11 +25,16 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
endpoint_id: Option<String>,
) -> Result<Self> {
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
let Some(ref endpoint_id) = endpoint_id else {
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
@@ -84,7 +89,7 @@ impl ComputeNode {
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -97,7 +102,7 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
@@ -109,13 +114,14 @@ impl ComputeNode {
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
/// from_endpoint: None for endpoint managed by this compute_ctl
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
@@ -173,7 +179,7 @@ impl ComputeNode {
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();

View File

@@ -51,7 +51,7 @@ pub fn write_postgres_conf(
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
writeln!(file, "{}", conf)?;
writeln!(file, "{conf}")?;
}
// Add options for connecting to storage
@@ -70,7 +70,7 @@ pub fn write_postgres_conf(
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
write!(neon_safekeepers_value, "g#{generation}:")?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
@@ -109,8 +109,8 @@ pub fn write_postgres_conf(
tls::update_key_path_blocking(pgdata_path, tls_config);
// these are the default, but good to be explicit.
writeln!(file, "ssl_cert_file = '{}'", SERVER_CRT)?;
writeln!(file, "ssl_key_file = '{}'", SERVER_KEY)?;
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;
}
// Locales
@@ -191,8 +191,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -244,8 +243,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -263,7 +261,7 @@ pub fn write_postgres_conf(
}
}
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
writeln!(file, "neon.extension_server_port={extension_server_port}")?;
if spec.drop_subscriptions_before_start {
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
@@ -291,7 +289,7 @@ where
{
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
write!(file, "{options}")?;
let res = exec();

View File

@@ -310,10 +310,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!(
"could not perform remote extensions server request: {:?}",
e
),
format!("could not perform remote extensions server request: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -323,7 +320,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
StatusCode::OK => match resp.bytes().await {
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {:?}", e),
format!("could not read remote extensions server response: {e:?}"),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
status.to_string(),
@@ -334,10 +331,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
status.to_string(),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
format!("unexpected remote extensions server response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -65,7 +65,7 @@ pub(in crate::http) async fn configure(
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
let msg = format!("compute configuration failed: {err:?}");
return Err(msg);
}
}

View File

@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
#[derive(serde::Deserialize)]
pub struct PrewarmQuery {
pub from_endpoint: String,
}
pub(in crate::http) async fn prewarm(
compute: Compute,
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
) -> Response {
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(

View File

@@ -1,32 +1,42 @@
use std::sync::Arc;
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use compute_api::responses::ComputeStatus;
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
use tokio::task;
use tracing::info;
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: compute_api::responses::TerminateMode,
}
/// Terminate the compute.
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
pub(in crate::http) async fn terminate(
State(compute): State<Arc<ComputeNode>>,
OptionalQuery(terminate): OptionalQuery<TerminateQuery>,
) -> Response {
let mode = terminate.unwrap_or_default().mode;
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
return StatusCode::CREATED.into_response();
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
drop(state);
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
}
forward_termination_signal();
forward_termination_signal(false);
info!("sent signal and notified waiters");
// Spawn a blocking thread to wait for compute to become Terminated.
@@ -34,7 +44,7 @@ pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>)
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let lsn = task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Terminated {
state = c.state_changed.wait(state).unwrap();
@@ -44,11 +54,10 @@ pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>)
state.status
);
}
state.terminate_flush_lsn
})
.await
.unwrap();
info!("terminated Postgres");
StatusCode::OK.into_response()
JsonResponse::success(StatusCode::OK, TerminateResponse { lsn })
}

View File

@@ -43,7 +43,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -57,7 +57,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -22,6 +22,7 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

View File

@@ -130,7 +130,7 @@ fn try_acquire_lsn_lease(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,

View File

@@ -83,7 +83,9 @@ impl ComputeMonitor {
let compute_status = self.compute.get_status();
if matches!(
compute_status,
ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
ComputeStatus::Terminated
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Failed
) {
info!(
"compute is in {} status, stopping compute monitor",

View File

@@ -36,9 +36,9 @@ pub fn escape_literal(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
if res.contains('\\') {
format!("E'{}'", res)
format!("E'{res}'")
} else {
format!("'{}'", res)
format!("'{res}'")
}
}
@@ -46,7 +46,7 @@ pub fn escape_literal(s: &str) -> String {
/// with `'{}'` is not required, as it returns a ready-to-use config string.
pub fn escape_conf_value(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
format!("'{}'", res)
format!("'{res}'")
}
pub trait GenericOptionExt {
@@ -446,7 +446,7 @@ pub async fn tune_pgbouncer(
let mut pgbouncer_connstr =
"host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
pgbouncer_connstr.push_str(format!(" password={}", pass).as_str());
pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
}
pgbouncer_connstr
};
@@ -464,7 +464,7 @@ pub async fn tune_pgbouncer(
Ok((client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
break client;

View File

@@ -0,0 +1 @@
pub const PGBOUNCER_PIDFILE: &str = "/tmp/pgbouncer.pid";

View File

@@ -23,12 +23,12 @@ fn do_control_plane_request(
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
.header("Authorization", format!("Bearer {jwt}"))
.send()
.map_err(|e| {
(
true,
format!("could not perform request to control plane: {:?}", e),
format!("could not perform request to control plane: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -39,7 +39,7 @@ fn do_control_plane_request(
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {:?}", e),
format!("could not deserialize control plane response: {e:?}"),
status.to_string(),
)),
},
@@ -62,7 +62,7 @@ fn do_control_plane_request(
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!("unexpected control plane response status code: {}", status),
format!("unexpected control plane response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -933,56 +933,53 @@ async fn get_operations<'a>(
PerDatabasePhase::DeleteDBRoleReferences => {
let ctx = ctx.read().await;
let operations =
spec.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
let operations = spec
.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!(
"REASSIGN OWNED BY {} TO {}",
quoted, new_owner,
),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {}", quoted),
comment: None,
},
])
})
.flatten();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {quoted}"),
comment: None,
},
])
})
.flatten();
Ok(Box::new(operations))
}

View File

@@ -27,7 +27,7 @@ pub async fn ping_safekeeper(
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -0,0 +1,6 @@
### Test files
The file `cluster_spec.json` has been copied over from libs/compute_api
tests, with some edits:
- the neon.safekeepers setting contains a duplicate value

View File

@@ -0,0 +1,245 @@
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Zenith Test",
"state": "restarted",
"roles": [
{
"name": "postgres",
"encrypted_password": "6b1d16b78004bbd51fa06af9eda75972",
"options": null
},
{
"name": "alexk",
"encrypted_password": null,
"options": null
},
{
"name": "zenith \"new\"",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972",
"options": null
},
{
"name": "zen",
"encrypted_password": "9b1d16b78004bbd51fa06af9eda75972"
},
{
"name": "\"name\";\\n select 1;",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
},
{
"name": "MyRole",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
}
],
"databases": [
{
"name": "DB2",
"owner": "alexk",
"options": [
{
"name": "LC_COLLATE",
"value": "C",
"vartype": "string"
},
{
"name": "LC_CTYPE",
"value": "C",
"vartype": "string"
},
{
"name": "TEMPLATE",
"value": "template0",
"vartype": "enum"
}
]
},
{
"name": "zenith",
"owner": "MyRole"
},
{
"name": "zen",
"owner": "zen"
}
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "logical",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501,127.0.0.1:6502",
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": "55432",
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": "b0554b632bd4d547a63b86c3630317e8",
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": "2414a61ffc94e428f14b5758fe308e13",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=127.0.0.1 port=6400",
"vartype": "string"
},
{
"name": "test.escaping",
"value": "here's a backslash \\ and a quote ' and a double-quote \" hooray",
"vartype": "string"
}
]
},
"delta_operations": [
{
"action": "delete_db",
"name": "zenith_test"
},
{
"action": "rename_db",
"name": "DB",
"new_name": "DB2"
},
{
"action": "delete_role",
"name": "zenith2"
},
{
"action": "rename_role",
"name": "zenith new",
"new_name": "zenith \"new\""
}
],
"remote_extensions": {
"library_index": {
"postgis-3": "postgis",
"libpgrouting-3.4": "postgis",
"postgis_raster-3": "postgis",
"postgis_sfcgal-3": "postgis",
"postgis_topology-3": "postgis",
"address_standardizer-3": "postgis"
},
"extension_data": {
"postgis": {
"archive_path": "5834329303/v15/extensions/postgis.tar.zst",
"control_data": {
"postgis.control": "# postgis extension\ncomment = ''PostGIS geometry and geography spatial types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis-3''\nrelocatable = false\ntrusted = true\n",
"pgrouting.control": "# pgRouting Extension\ncomment = ''pgRouting Extension''\ndefault_version = ''3.4.2''\nmodule_pathname = ''$libdir/libpgrouting-3.4''\nrelocatable = true\nrequires = ''plpgsql''\nrequires = ''postgis''\ntrusted = true\n",
"postgis_raster.control": "# postgis_raster extension\ncomment = ''PostGIS raster types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis_raster-3''\nrelocatable = false\nrequires = postgis\ntrusted = true\n",
"postgis_sfcgal.control": "# postgis topology extension\ncomment = ''PostGIS SFCGAL functions''\ndefault_version = ''3.3.2''\nrelocatable = true\nrequires = postgis\ntrusted = true\n",
"postgis_topology.control": "# postgis topology extension\ncomment = ''PostGIS topology spatial types and functions''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = topology\nrequires = postgis\ntrusted = true\n",
"address_standardizer.control": "# address_standardizer extension\ncomment = ''Used to parse an address into constituent elements. Generally used to support geocoding address normalization step.''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n",
"postgis_tiger_geocoder.control": "# postgis tiger geocoder extension\ncomment = ''PostGIS tiger geocoder and reverse geocoder''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = tiger\nrequires = ''postgis,fuzzystrmatch''\nsuperuser= false\ntrusted = true\n",
"address_standardizer_data_us.control": "# address standardizer us dataset\ncomment = ''Address Standardizer US dataset example''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n"
}
}
},
"custom_extensions": [],
"public_extensions": ["postgis"]
},
"pgbouncer_settings": {
"default_pool_size": "42",
"pool_mode": "session"
}
}

View File

@@ -18,7 +18,7 @@ use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -605,6 +605,14 @@ struct EndpointCreateCmdArgs {
#[clap(long, help = "Postgres version")]
pg_version: u32,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
///
/// Specified on creation such that it's retained across reconfiguration and restarts.
///
/// NB: not yet supported by computes.
#[clap(long)]
grpc: bool,
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
@@ -664,6 +672,13 @@ struct EndpointStartCmdArgs {
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
#[arg(default_value = "90s")]
start_timeout: Duration,
#[clap(
long,
help = "Run in development mode, skipping VM-specific operations like process termination",
action = clap::ArgAction::SetTrue
)]
dev: bool,
}
#[derive(clap::Args)]
@@ -696,10 +711,9 @@ struct EndpointStopCmdArgs {
)]
destroy: bool,
#[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
#[arg(value_parser(["smart", "fast", "immediate"]))]
#[arg(default_value = "fast")]
mode: String,
#[clap(long, help = "Postgres shutdown mode")]
#[clap(default_value = "fast")]
mode: EndpointTerminateMode,
}
#[derive(clap::Args)]
@@ -905,7 +919,7 @@ fn print_timeline(
br_sym = "┗━";
}
print!("{} @{}: ", br_sym, ancestor_lsn);
print!("{br_sym} @{ancestor_lsn}: ");
}
// Finally print a timeline id and name with new line
@@ -1451,6 +1465,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.internal_http_port,
args.pg_version,
mode,
args.grpc,
!args.update_catalog,
false,
)?;
@@ -1491,13 +1506,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
)
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
@@ -1516,11 +1538,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
))
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
};
anyhow::Ok(pageserver)
}),
)
.await?;
@@ -1565,6 +1596,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,
args.dev,
)
.await?;
}
@@ -1575,11 +1607,19 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
vec![(
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
vec![pageserver]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1588,11 +1628,21 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
})
.collect::<Vec<_>>()
};
@@ -1607,7 +1657,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(&args.mode, args.destroy)?;
match endpoint.stop(args.mode, args.destroy).await?.lsn {
Some(lsn) => println!("{lsn}"),
None => println!("null"),
}
}
EndpointCmd::GenerateJwt(args) => {
let endpoint = {
@@ -1689,7 +1742,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
}
@@ -1698,7 +1751,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
let pageserver = get_pageserver(env, args.pageserver_id)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
@@ -1715,7 +1768,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
{
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
eprintln!("Page server is not available: {err}");
exit(1);
}
}
@@ -1752,7 +1805,7 @@ async fn handle_storage_controller(
},
};
if let Err(e) = svc.stop(stop_args).await {
eprintln!("stop failed: {}", e);
eprintln!("stop failed: {e}");
exit(1);
}
}
@@ -1774,7 +1827,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
let safekeeper = get_safekeeper(env, args.id)?;
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -1786,7 +1839,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
}
@@ -1799,12 +1852,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -2039,11 +2092,16 @@ async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Resul
}
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let mode = if immediate {
EndpointTerminateMode::Immediate
} else {
EndpointTerminateMode::Fast
};
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
if let Err(e) = node.stop(mode, false).await {
eprintln!("postgres stop failed: {e:#}");
}
}
@@ -2055,7 +2113,7 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let storage = EndpointStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("endpoint_storage stop failed: {e:#}");
}
for ps_conf in &env.pageservers {

View File

@@ -37,6 +37,7 @@
//! ```
//!
use std::collections::BTreeMap;
use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
@@ -45,11 +46,14 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
@@ -74,7 +78,6 @@ use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -87,6 +90,7 @@ pub struct EndpointConf {
external_http_port: u16,
internal_http_port: u16,
pg_version: u32,
grpc: bool,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
@@ -164,7 +168,7 @@ impl ComputeControlPlane {
public_key_use: Some(PublicKeyUse::Signature),
key_operations: Some(vec![KeyOperations::Verify]),
key_algorithm: Some(KeyAlgorithm::EdDSA),
key_id: Some(base64::encode_config(key_hash, base64::URL_SAFE_NO_PAD)),
key_id: Some(BASE64_URL_SAFE_NO_PAD.encode(key_hash)),
x509_url: None::<String>,
x509_chain: None::<Vec<String>>,
x509_sha1_fingerprint: None::<String>,
@@ -173,7 +177,7 @@ impl ComputeControlPlane {
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: base64::encode_config(public_key, base64::URL_SAFE_NO_PAD),
x: BASE64_URL_SAFE_NO_PAD.encode(public_key),
}),
}],
})
@@ -190,6 +194,7 @@ impl ComputeControlPlane {
internal_http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
@@ -224,6 +229,7 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
@@ -242,6 +248,7 @@ impl ComputeControlPlane {
internal_http_port,
pg_port,
pg_version,
grpc,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
@@ -296,6 +303,8 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mode: ComputeMode,
/// If true, the endpoint should use gRPC to communicate with Pageservers.
pub grpc: bool,
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
pub pg_address: SocketAddr,
@@ -331,15 +340,58 @@ pub enum EndpointStatus {
RunningNoPidfile,
}
impl std::fmt::Display for EndpointStatus {
impl Display for EndpointStatus {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
writer.write_str(match self {
Self::Running => "running",
Self::Stopped => "stopped",
Self::Crashed => "crashed",
Self::RunningNoPidfile => "running, no pidfile",
};
write!(writer, "{}", s)
})
}
}
#[derive(Default, Clone, Copy, clap::ValueEnum)]
pub enum EndpointTerminateMode {
#[default]
/// Use pg_ctl stop -m fast
Fast,
/// Use pg_ctl stop -m immediate
Immediate,
/// Use /terminate?mode=immediate
ImmediateTerminate,
}
impl std::fmt::Display for EndpointTerminateMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match &self {
EndpointTerminateMode::Fast => "fast",
EndpointTerminateMode::Immediate => "immediate",
EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
})
}
}
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
@@ -378,6 +430,7 @@ impl Endpoint {
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
grpc: conf.grpc,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
@@ -606,10 +659,10 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
pageservers
.iter()
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
@@ -654,11 +707,12 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
dev: bool,
) -> Result<()> {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
@@ -792,10 +846,10 @@ impl Endpoint {
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
println!("Starting postgres node at '{conn_str}'");
if create_test_user {
let conn_str = self.connstr("test", "neondb");
println!("Also at '{}'", conn_str);
println!("Also at '{conn_str}'");
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args([
@@ -829,6 +883,10 @@ impl Endpoint {
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
}
if dev {
cmd.arg("--dev");
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {
@@ -881,7 +939,7 @@ impl Endpoint {
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
@@ -890,8 +948,7 @@ impl Endpoint {
Err(e) => {
if Instant::now().duration_since(start_at) > start_timeout {
return Err(e).context(format!(
"timed out {:?} waiting to connect to compute_ctl HTTP",
start_timeout,
"timed out {start_timeout:?} waiting to connect to compute_ctl HTTP",
));
}
}
@@ -930,7 +987,7 @@ impl Endpoint {
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
@@ -939,10 +996,12 @@ impl Endpoint {
pub async fn reconfigure(
&self,
mut pageservers: Vec<(Host, u16)>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -954,25 +1013,7 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If we weren't given explicit pageservers, query the storage controller
if pageservers.is_empty() {
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>();
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
@@ -1012,15 +1053,34 @@ impl Endpoint {
} else {
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
self.pg_ctl(&["-m", mode, "stop"], &None)?;
pub async fn stop(
&self,
mode: EndpointTerminateMode,
destroy: bool,
) -> Result<TerminateResponse> {
// pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
// slow, and test runs time out. Solution: special mode "immediate-terminate"
// which uses /terminate
let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
let ip = self.external_http_address.ip();
let port = self.external_http_address.port();
let url = format!("http://{ip}:{port}/terminate?mode=immediate");
let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
let request = reqwest::Client::new().post(url).bearer_auth(token);
let response = request.send().await.context("/terminate")?;
let text = response.text().await.context("/terminate result")?;
serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
} else {
self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
TerminateResponse { lsn: None }
};
// Also wait for the compute_ctl process to die. It might have some
// cleanup work to do after postgres stops, like syncing safekeepers,
@@ -1030,7 +1090,7 @@ impl Endpoint {
// waiting. Sometimes we do *not* want this cleanup: tests intentionally
// do stop when majority of safekeepers is down, so sync-safekeepers
// would hang otherwise. This could be a separate flag though.
let send_sigterm = destroy || mode == "immediate";
let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
self.wait_for_compute_ctl_to_exit(send_sigterm)?;
if destroy {
println!(
@@ -1039,7 +1099,7 @@ impl Endpoint {
);
std::fs::remove_dir_all(self.endpoint_path())?;
}
Ok(())
Ok(response)
}
pub fn connstr(&self, user: &str, db_name: &str) -> String {

View File

@@ -209,6 +209,10 @@ pub struct NeonStorageControllerConf {
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
pub timeline_safekeeper_count: Option<i64>,
pub kick_secondary_downloads: Option<bool>,
}
impl NeonStorageControllerConf {
@@ -236,9 +240,11 @@ impl Default for NeonStorageControllerConf {
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
timelines_onto_safekeepers: true,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
timeline_safekeeper_count: None,
kick_secondary_downloads: None,
}
}
}
@@ -254,7 +260,7 @@ impl Default for EndpointStorageConf {
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
format!("https://{addr}")
} else {
format!(
"http://{}",
@@ -727,7 +733,7 @@ impl LocalEnv {
let config_toml_path = dentry.path().join("pageserver.toml");
let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&config_toml_path)
.with_context(|| format!("read {:?}", config_toml_path))?,
.with_context(|| format!("read {config_toml_path:?}"))?,
)
.context("parse pageserver.toml")?;
let identity_toml_path = dentry.path().join("identity.toml");
@@ -737,7 +743,7 @@ impl LocalEnv {
}
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&identity_toml_path)
.with_context(|| format!("read {:?}", identity_toml_path))?,
.with_context(|| format!("read {identity_toml_path:?}"))?,
)
.context("parse identity.toml")?;
let PageserverConfigTomlSubset {

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -120,7 +121,7 @@ impl PageServerNode {
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
overrides.push(format!("control_plane_api_token='{jwt_token}'"));
}
if !conf.other.contains_key("remote_storage") {
@@ -252,9 +253,10 @@ impl PageServerNode {
// the storage controller
let metadata_path = datadir.join("metadata.json");
let (_http_host, http_port) =
let http_host = "localhost".to_string();
let (_, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
let https_port = match self.conf.listen_https_addr.as_ref() {
Some(https_addr) => {
@@ -265,6 +267,13 @@ impl PageServerNode {
None => None,
};
let (mut grpc_host, mut grpc_port) = (None, None);
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
grpc_host = Some("localhost".to_string());
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
}
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -273,7 +282,9 @@ impl PageServerNode {
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: self.pg_connection_config.port(),
http_host: "localhost".to_string(),
grpc_host,
grpc_port,
http_host,
http_port,
https_port,
other: HashMap::from([(

View File

@@ -143,7 +143,7 @@ impl SafekeeperNode {
let id_string = id.to_string();
// TODO: add availability_zone to the config.
// Right now we just specify any value here and use it to check metrics in tests.
let availability_zone = format!("sk-{}", id_string);
let availability_zone = format!("sk-{id_string}");
let mut args = vec![
"-D".to_owned(),

View File

@@ -167,7 +167,7 @@ impl StorageController {
fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
self.env
.base_data_dir
.join(format!("storage_controller_{}", instance_id))
.join(format!("storage_controller_{instance_id}"))
}
fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
@@ -220,7 +220,7 @@ impl StorageController {
"-d",
DB_NAME,
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
];
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
@@ -263,7 +263,7 @@ impl StorageController {
"-h",
"localhost",
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
"-U",
&username(),
"-O",
@@ -425,7 +425,7 @@ impl StorageController {
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", postgres_port),
format!("port = {postgres_port}\nfsync=off\n"),
)
.await?;
@@ -477,7 +477,7 @@ impl StorageController {
self.setup_database(postgres_port).await?;
}
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
let database_url = format!("postgresql://localhost:{postgres_port}/{DB_NAME}");
// We support running a startup SQL script to fiddle with the database before we launch storcon.
// This is used by the test suite.
@@ -508,7 +508,7 @@ impl StorageController {
drop(client);
conn.await??;
let addr = format!("{}:{}", host, listen_port);
let addr = format!("{host}:{listen_port}");
let address_for_peers = Uri::builder()
.scheme(scheme)
.authority(addr.clone())
@@ -557,6 +557,10 @@ impl StorageController {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(value) = self.config.kick_secondary_downloads {
args.push(format!("--kick-secondary-downloads={value}"));
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
@@ -628,6 +632,10 @@ impl StorageController {
args.push("--timelines-onto-safekeepers".to_string());
}
if let Some(sk_cnt) = self.config.timeline_safekeeper_count {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
println!("Starting storage controller");
background_process::start_process(
@@ -802,9 +810,9 @@ impl StorageController {
builder = builder.json(&body)
}
if let Some(private_key) = &self.private_key {
println!("Getting claims for path {}", path);
println!("Getting claims for path {path}");
if let Some(required_claims) = Self::get_claims_for_path(&path)? {
println!("Got claims {:?} for path {}", required_claims, path);
println!("Got claims {required_claims:?} for path {path}");
let jwt_token = encode_from_key_file(&required_claims, private_key)?;
builder = builder.header(
reqwest::header::AUTHORIZATION,

View File

@@ -36,6 +36,10 @@ enum Command {
listen_pg_addr: String,
#[arg(long)]
listen_pg_port: u16,
#[arg(long)]
listen_grpc_addr: Option<String>,
#[arg(long)]
listen_grpc_port: Option<u16>,
#[arg(long)]
listen_http_addr: String,
@@ -418,6 +422,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -431,6 +437,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -641,7 +649,7 @@ async fn main() -> anyhow::Result<()> {
response
.new_shards
.iter()
.map(|s| format!("{:?}", s))
.map(|s| format!("{s:?}"))
.collect::<Vec<_>>()
.join(",")
);
@@ -763,8 +771,8 @@ async fn main() -> anyhow::Result<()> {
println!("Tenant {tenant_id}");
let mut table = comfy_table::Table::new();
table.add_row(["Policy", &format!("{:?}", policy)]);
table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
table.add_row(["Policy", &format!("{policy:?}")]);
table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
println!("{table}");
println!("Shards:");
@@ -781,7 +789,7 @@ async fn main() -> anyhow::Result<()> {
let secondary = shard
.node_secondary
.iter()
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.collect::<Vec<_>>()
.join(",");
@@ -855,7 +863,7 @@ async fn main() -> anyhow::Result<()> {
}
} else {
// Make it obvious to the user that since they've omitted an AZ, we're clearing it
eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
eprintln!("Clearing preferred AZ for tenant {tenant_id}");
}
// Construct a request that modifies all the tenant's shards
@@ -1126,8 +1134,7 @@ async fn main() -> anyhow::Result<()> {
Err((tenant_shard_id, from, to, error)) => {
failure += 1;
println!(
"Failed to migrate {} from node {} to node {}: {}",
tenant_shard_id, from, to, error
"Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
);
}
}
@@ -1269,8 +1276,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
tenant_shard_id, timeline_id,
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1295,8 +1301,7 @@ async fn watch_tenant_shard(
) -> anyhow::Result<()> {
if let Some(until_migrated_to) = until_migrated_to {
println!(
"Waiting for tenant shard {} to be migrated to node {}",
tenant_shard_id, until_migrated_to
"Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
);
}
@@ -1319,7 +1324,7 @@ async fn watch_tenant_shard(
"attached: {} secondary: {} {}",
shard
.node_attached
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.unwrap_or("none".to_string()),
shard
.node_secondary
@@ -1333,15 +1338,12 @@ async fn watch_tenant_shard(
"(reconciler idle)"
}
);
println!("{}", summary);
println!("{summary}");
// Maybe drop out if we finished migration
if let Some(until_migrated_to) = until_migrated_to {
if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
println!(
"Tenant shard {} is now on node {}",
tenant_shard_id, until_migrated_to
);
println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
break;
}
}

View File

@@ -95,3 +95,4 @@ echo "Start compute node"
-b /usr/local/bin/postgres \
--compute-id "compute-${RANDOM}" \
--config "${CONFIG_FILE}"
--dev

View File

@@ -374,7 +374,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)

View File

@@ -31,13 +31,12 @@ struct Args {
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
#[serde(default = "listen")]
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
storage_config: remote_storage::RemoteStorageConfig,
storage_kind: remote_storage::TypedRemoteStorageKind,
#[serde(default = "max_upload_file_limit")]
max_upload_file_limit: usize,
}
@@ -70,7 +69,8 @@ async fn main() -> anyhow::Result<()> {
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let storage =
remote_storage::GenericRemoteStorage::from_storage_kind(config.storage_kind).await?;
let cancel = tokio_util::sync::CancellationToken::new();
if !args.no_s3_check_on_startup {
app::check_storage_permissions(&storage, cancel.clone()).await?;

View File

@@ -16,6 +16,7 @@ pub static COMPUTE_AUDIENCE: &str = "compute";
pub enum ComputeClaimsScope {
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
/// facilities.
#[serde(rename = "compute_ctl:admin")]
Admin,
}
@@ -24,7 +25,7 @@ impl FromStr for ComputeClaimsScope {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"admin" => Ok(ComputeClaimsScope::Admin),
"compute_ctl:admin" => Ok(ComputeClaimsScope::Admin),
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
}
}
@@ -80,3 +81,23 @@ pub struct SetRoleGrantsRequest {
pub privileges: Vec<Privilege>,
pub role: PgIdent,
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use crate::requests::ComputeClaimsScope;
/// Confirm that whether we parse the scope by string or through serde, the
/// same values parse to the same enum variant.
#[test]
fn compute_request_scopes() {
const ADMIN_SCOPE: &str = "compute_ctl:admin";
let from_serde: ComputeClaimsScope =
serde_json::from_str(&format!("\"{ADMIN_SCOPE}\"")).unwrap();
let from_str = ComputeClaimsScope::from_str(ADMIN_SCOPE).unwrap();
assert_eq!(from_serde, from_str);
}
}

View File

@@ -83,6 +83,16 @@ pub struct ComputeStatusResponse {
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum TerminateMode {
#[default]
/// wait 30s till returning from /terminate to allow control plane to get the error
Fast,
/// return from /terminate immediately as soon as all components are terminated
Immediate,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
@@ -103,11 +113,16 @@ pub enum ComputeStatus {
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPending,
TerminationPending { mode: TerminateMode },
// Terminated Postgres
Terminated,
}
#[derive(Deserialize, Serialize)]
pub struct TerminateResponse {
pub lsn: Option<utils::lsn::Lsn>,
}
impl Display for ComputeStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -117,7 +132,7 @@ impl Display for ComputeStatus {
ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"),
ComputeStatus::Failed => f.write_str("failed"),
ComputeStatus::TerminationPending => f.write_str("termination-pending"),
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
ComputeStatus::Terminated => f.write_str("terminated"),
}
}

View File

@@ -71,7 +71,7 @@ impl Runtime {
debug!("thread panicked: {:?}", e);
let mut result = ctx.result.lock();
if result.0 == -1 {
*result = (256, format!("thread panicked: {:?}", e));
*result = (256, format!("thread panicked: {e:?}"));
}
});
}
@@ -419,13 +419,13 @@ pub fn now() -> u64 {
with_thread_context(|ctx| ctx.clock.get().unwrap().now())
}
pub fn exit(code: i32, msg: String) {
pub fn exit(code: i32, msg: String) -> ! {
with_thread_context(|ctx| {
ctx.allow_panic.store(true, Ordering::SeqCst);
let mut result = ctx.result.lock();
*result = (code, msg);
panic!("exit");
});
})
}
pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {

View File

@@ -47,8 +47,8 @@ impl Debug for AnyMessage {
match self {
AnyMessage::None => write!(f, "None"),
AnyMessage::InternalConnect => write!(f, "InternalConnect"),
AnyMessage::Just32(v) => write!(f, "Just32({})", v),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v),
AnyMessage::Just32(v) => write!(f, "Just32({v})"),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({v:?})"),
AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)),
AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)),
}

View File

@@ -582,14 +582,14 @@ pub fn attach_openapi_ui(
deepLinking: true,
showExtensions: true,
showCommonExtensions: true,
url: "{}",
url: "{spec_mount_path}",
}})
window.ui = ui;
}};
</script>
</body>
</html>
"#, spec_mount_path))).unwrap())
"#))).unwrap())
})
)
}
@@ -696,7 +696,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let mut req: Request<Body> = Request::default();
@@ -716,7 +716,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let req: Request<Body> = Request::default();

View File

@@ -86,7 +86,7 @@ impl ShmemHandle {
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
panic!("max size {max_size} too large");
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
@@ -279,7 +279,7 @@ mod tests {
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
assert_eq!(expected, b, "unexpected byte at offset {i}");
}
}

View File

@@ -17,7 +17,7 @@ anyhow.workspace = true
bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -12,6 +12,7 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap;
use std::fmt::Display;
use std::num::{NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::Duration;
@@ -24,16 +25,17 @@ use utils::logging::LogFormat;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// as a separate structure. This information is not needed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeMetadata {
#[serde(rename = "host")]
pub postgres_host: String,
#[serde(rename = "port")]
pub postgres_port: u16,
pub grpc_host: Option<String>,
pub grpc_port: Option<u16>,
pub http_host: String,
pub http_port: u16,
pub https_port: Option<u16>,
@@ -44,6 +46,23 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
impl Display for NodeMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"postgresql://{}:{} ",
self.postgres_host, self.postgres_port
)?;
if let Some(grpc_host) = &self.grpc_host {
let grpc_port = self.grpc_port.unwrap_or_default();
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
}
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
write!(f, "other:{:?}", self.other)?;
Ok(())
}
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
@@ -57,6 +76,10 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,
}
/// `pageserver.toml`
@@ -337,16 +360,21 @@ pub struct TimelineImportConfig {
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
/// Maximum total size of basebackup cache entries on disk in bytes.
/// The cache may slightly exceed this limit because we do not know
/// the exact size of the cache entry untill it's written to disk.
pub max_total_size_bytes: u64,
// TODO(diko): support max_entry_size_bytes.
// pub max_entry_size_bytes: u64,
pub max_size_entries: usize,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
max_size_entries: 1000,
}
}
@@ -792,7 +820,7 @@ pub mod tenant_conf_defaults {
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false;
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = true;
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;

View File

@@ -14,6 +14,8 @@ fn test_node_metadata_v1_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: None,
@@ -37,6 +39,35 @@ fn test_node_metadata_v2_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),
other: HashMap::new(),
}
)
}
#[test]
fn test_node_metadata_v3_backward_compatibilty() {
let v3 = serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": 23,
"grpc_host": "localhost",
"grpc_port": 51,
"http_host": "localhost",
"http_port": 42,
"https_port": 123,
}));
assert_eq!(
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: Some("localhost".to_string()),
grpc_port: Some(51),
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),

View File

@@ -52,6 +52,8 @@ pub struct NodeRegisterRequest {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -101,6 +103,8 @@ pub struct TenantLocateResponseShard {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -152,6 +156,8 @@ pub struct NodeDescribeResponse {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -571,8 +577,7 @@ mod test {
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -4,8 +4,8 @@ use std::ops::Range;
use anyhow::{Result, bail};
use byteorder::{BE, ByteOrder};
use bytes::Bytes;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{Oid, RepOriginId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
use utils::const_assert;
@@ -194,7 +194,7 @@ impl Key {
/// will be rejected on the write path.
#[allow(dead_code)]
pub fn is_valid_key_on_write_path_strong(&self) -> bool {
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
if !self.is_i128_representable() {
return false;
}

View File

@@ -1,7 +1,6 @@
use std::ops::Range;
use itertools::Itertools;
use postgres_ffi::BLCKSZ;
use crate::key::Key;
use crate::shard::{ShardCount, ShardIdentity};
@@ -269,9 +268,13 @@ impl KeySpace {
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
/// in each partition.
///
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
pub fn partition(
&self,
shard_identity: &ShardIdentity,
target_size: u64,
block_size: u64,
) -> KeyPartitioning {
let target_nblocks = (target_size / block_size) as u32;
let mut parts = Vec::new();
let mut current_part = Vec::new();
@@ -331,8 +334,7 @@ impl KeySpace {
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
"Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
);
}
@@ -1101,7 +1103,7 @@ mod tests {
// total range contains at least one shard-local page
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
if !all_nonzero {
eprintln!("Found a zero-length fragment: {:?}", fragments);
eprintln!("Found a zero-length fragment: {fragments:?}");
}
assert!(all_nonzero);
} else {

View File

@@ -5,11 +5,10 @@ pub mod controller_api;
pub mod key;
pub mod keyspace;
pub mod models;
pub mod record;
pub mod pagestream_api;
pub mod reltag;
pub mod shard;
/// Public API types
pub mod upcall_api;
pub mod value;
pub mod config;

View File

@@ -5,16 +5,12 @@ pub mod utilization;
use core::ops::Range;
use std::collections::HashMap;
use std::fmt::Display;
use std::io::{BufRead, Read};
use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
#[cfg(feature = "testing")]
use camino::Utf8PathBuf;
use postgres_ffi::BLCKSZ;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
pub use utilization::PageserverUtilization;
@@ -24,7 +20,6 @@ use utils::{completion, serde_system_time};
use crate::config::Ratio;
use crate::key::{CompactKey, Key};
use crate::reltag::RelTag;
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
/// The state of a tenant in this pageserver.
@@ -1187,7 +1182,7 @@ impl Display for ImageCompressionAlgorithm {
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
ImageCompressionAlgorithm::Zstd { level } => {
if let Some(level) = level {
write!(f, "zstd({})", level)
write!(f, "zstd({level})")
} else {
write!(f, "zstd")
}
@@ -1907,219 +1902,6 @@ pub struct ScanDisposableKeysResponse {
pub not_disposable_count: usize,
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
DbSize = 3,
GetSlruSegment = 4,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
Exists = 100,
Nblocks = 101,
GetPage = 102,
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::DbSize),
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
100 => Ok(PagestreamBeMessageTag::Exists),
101 => Ok(PagestreamBeMessageTag::Nblocks),
102 => Ok(PagestreamBeMessageTag::GetPage),
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
}
// A GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
// difference in the responses between V1 and V2.
//
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
// as well as other fields from requests, which allows to verify that we receive response for our request.
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
//
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PagestreamProtocolVersion {
V2,
V3,
}
pub type RequestId = u64;
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamRequest {
pub reqid: RequestId,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamExistsRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamNblocksRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetPageRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamDbSizeRequest {
pub hdr: PagestreamRequest,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetSlruSegmentRequest {
pub hdr: PagestreamRequest,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub req: PagestreamExistsRequest,
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub req: PagestreamNblocksRequest,
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub req: PagestreamGetPageRequest,
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub req: PagestreamGetSlruSegmentRequest,
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub req: PagestreamRequest,
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub req: PagestreamDbSizeRequest,
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
@@ -2131,506 +1913,6 @@ pub struct TenantHistorySize {
pub size: Option<u64>,
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 3.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (reqid, request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
0,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V3 => (
body.read_u64::<BigEndian>()?,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
};
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
let mut bytes = BytesMut::new();
use PagestreamBeMessageTag as Tag;
match protocol_version {
PagestreamProtocolVersion::V2 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.req.blkno);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put_u64(resp.req.reqid);
bytes.put_u64(resp.req.request_lsn.0);
bytes.put_u64(resp.req.not_modified_since.0);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.dbnode);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
use PagestreamBeMessageTag as Tag;
let ok =
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
Tag::Exists => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let exists = buf.read_u8()? != 0;
Self::Exists(PagestreamExistsResponse {
req: PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
exists,
})
}
Tag::Nblocks => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let n_blocks = buf.read_u32::<BigEndian>()?;
Self::Nblocks(PagestreamNblocksResponse {
req: PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
n_blocks,
})
}
Tag::GetPage => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
Self::GetPage(PagestreamGetPageResponse {
req: PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
blkno,
},
page: page.into(),
})
}
Tag::Error => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let mut msg = Vec::new();
buf.read_until(0, &mut msg)?;
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
let rust_str = cstring.to_str()?;
Self::Error(PagestreamErrorResponse {
req: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
message: rust_str.to_owned(),
})
}
Tag::DbSize => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let dbnode = buf.read_u32::<BigEndian>()?;
let db_size = buf.read_i64::<BigEndian>()?;
Self::DbSize(PagestreamDbSizeResponse {
req: PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode,
},
db_size,
})
}
Tag::GetSlruSegment => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind,
segno,
},
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
anyhow::bail!(
"remaining bytes in msg with tag={msg_tag}: {}",
remaining.len()
);
}
Ok(ok)
}
pub fn kind(&self) -> &'static str {
match self {
Self::Exists(_) => "Exists",
Self::Nblocks(_) => "Nblocks",
Self::GetPage(_) => "GetPage",
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PageTraceEvent {
pub key: CompactKey,
@@ -2656,68 +1938,6 @@ mod tests {
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
.unwrap();
assert!(msg == reconstructed);
}
}
#[test]
fn test_tenantinfo_serde() {
// Test serialization/deserialization of TenantInfo
@@ -2791,8 +2011,7 @@ mod tests {
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -0,0 +1,798 @@
//! Rust definitions of the libpq-based pagestream API
//!
//! See also the C implementation of the same API in pgxn/neon/pagestore_client.h
use std::io::{BufRead, Read};
use crate::reltag::RelTag;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use utils::lsn::Lsn;
/// Block size.
///
/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
/// that in the protocol, because Postgres supports different block sizes as a compile
/// time option.
const BLCKSZ: usize = 8192;
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
DbSize = 3,
GetSlruSegment = 4,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
Exists = 100,
Nblocks = 101,
GetPage = 102,
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::DbSize),
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
100 => Ok(PagestreamBeMessageTag::Exists),
101 => Ok(PagestreamBeMessageTag::Nblocks),
102 => Ok(PagestreamBeMessageTag::GetPage),
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
}
// A GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
// difference in the responses between V1 and V2.
//
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
// as well as other fields from requests, which allows to verify that we receive response for our request.
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
//
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PagestreamProtocolVersion {
V2,
V3,
}
pub type RequestId = u64;
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamRequest {
pub reqid: RequestId,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamExistsRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamNblocksRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetPageRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamDbSizeRequest {
pub hdr: PagestreamRequest,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetSlruSegmentRequest {
pub hdr: PagestreamRequest,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub req: PagestreamExistsRequest,
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub req: PagestreamNblocksRequest,
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub req: PagestreamGetPageRequest,
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub req: PagestreamGetSlruSegmentRequest,
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub req: PagestreamRequest,
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub req: PagestreamDbSizeRequest,
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 3.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (reqid, request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
0,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V3 => (
body.read_u64::<BigEndian>()?,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
};
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
let mut bytes = BytesMut::new();
use PagestreamBeMessageTag as Tag;
match protocol_version {
PagestreamProtocolVersion::V2 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.req.blkno);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put_u64(resp.req.reqid);
bytes.put_u64(resp.req.request_lsn.0);
bytes.put_u64(resp.req.not_modified_since.0);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.dbnode);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
use PagestreamBeMessageTag as Tag;
let ok =
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
Tag::Exists => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let exists = buf.read_u8()? != 0;
Self::Exists(PagestreamExistsResponse {
req: PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
exists,
})
}
Tag::Nblocks => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let n_blocks = buf.read_u32::<BigEndian>()?;
Self::Nblocks(PagestreamNblocksResponse {
req: PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
n_blocks,
})
}
Tag::GetPage => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
Self::GetPage(PagestreamGetPageResponse {
req: PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
blkno,
},
page: page.into(),
})
}
Tag::Error => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let mut msg = Vec::new();
buf.read_until(0, &mut msg)?;
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
let rust_str = cstring.to_str()?;
Self::Error(PagestreamErrorResponse {
req: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
message: rust_str.to_owned(),
})
}
Tag::DbSize => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let dbnode = buf.read_u32::<BigEndian>()?;
let db_size = buf.read_i64::<BigEndian>()?;
Self::DbSize(PagestreamDbSizeResponse {
req: PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode,
},
db_size,
})
}
Tag::GetSlruSegment => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind,
segno,
},
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
anyhow::bail!(
"remaining bytes in msg with tag={msg_tag}: {}",
remaining.len()
);
}
Ok(ok)
}
pub fn kind(&self) -> &'static str {
match self {
Self::Exists(_) => "Exists",
Self::Nblocks(_) => "Nblocks",
Self::GetPage(_) => "GetPage",
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
.unwrap();
assert!(msg == reconstructed);
}
}
}

View File

@@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::Oid;
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use postgres_ffi_types::Oid;
use postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use serde::{Deserialize, Serialize};
///

View File

@@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher};
#[doc(inline)]
pub use ::utils::shard::*;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use postgres_ffi_types::forknum::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use crate::key::Key;

View File

@@ -9,7 +9,7 @@ use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::TenantShardId;
use crate::shard::{ShardStripeSize, TenantShardId};
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
@@ -23,19 +23,13 @@ pub struct ReAttachRequest {
pub register: Option<NodeRegisterRequest>,
}
fn default_mode() -> LocationConfigMode {
LocationConfigMode::AttachedSingle
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ReAttachResponseTenant {
pub id: TenantShardId,
/// Mandatory if LocationConfigMode is None or set to an Attached* mode
pub r#gen: Option<u32>,
/// Default value only for backward compat: this field should be set
#[serde(default = "default_mode")]
pub mode: LocationConfigMode,
pub stripe_size: ShardStripeSize,
}
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponse {

View File

@@ -939,7 +939,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
FeMessage::CopyFail => Err(CopyStreamHandlerEnd::CopyFail),
FeMessage::Terminate => Err(CopyStreamHandlerEnd::Terminate),
_ => Err(CopyStreamHandlerEnd::from(ConnectionError::Protocol(
ProtocolError::Protocol(format!("unexpected message in COPY stream {:?}", msg)),
ProtocolError::Protocol(format!("unexpected message in COPY stream {msg:?}")),
))),
},
None => Err(CopyStreamHandlerEnd::EOF),

View File

@@ -61,7 +61,7 @@ async fn simple_select() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -137,7 +137,7 @@ async fn simple_select_ssl() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -223,7 +223,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: None }"
);
}
@@ -239,7 +239,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "[::1]:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Ipv6(::1), port: 123, password: None }"
);
}
@@ -252,7 +252,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }"
);
}

View File

@@ -16,6 +16,7 @@ memoffset.workspace = true
pprof.workspace = true
thiserror.workspace = true
serde.workspace = true
postgres_ffi_types.workspace = true
utils.workspace = true
tracing.workspace = true

View File

@@ -11,11 +11,7 @@
use crate::{BLCKSZ, PageHeaderData};
//
// From pg_tablespace_d.h
//
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate.
// From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10;

View File

@@ -4,50 +4,7 @@
use once_cell::sync::OnceCell;
use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(_e: core::num::ParseIntError) -> Self {
FilePathError::InvalidFileName
}
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}
use postgres_ffi_types::forknum::*;
/// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple.
///
@@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
.ok_or(FilePathError::InvalidFileName)?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = relnode_str.parse::<u32>()?;
let relnode = relnode_str
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?;
let forkname = caps.name("forkname").map(|f| f.as_str());
let forknum = forkname_to_number(forkname)?;
@@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
let segno = if segno_match.is_none() {
0
} else {
segno_match.unwrap().as_str().parse::<u32>()?
segno_match
.unwrap()
.as_str()
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?
};
Ok((relnode, forknum, segno))

View File

@@ -114,7 +114,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|e| WalDecodeError {
msg: format!("long header deserialization failed {}", e),
msg: format!("long header deserialization failed {e}"),
lsn: self.lsn,
},
)?;
@@ -130,7 +130,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr =
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
msg: format!("header deserialization failed {e}"),
lsn: self.lsn,
}
})?;
@@ -155,7 +155,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
msg: format!("invalid xl_tot_len {xl_tot_len}"),
lsn: self.lsn,
});
}
@@ -218,7 +218,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
msg: format!("xlog record deserialization failed {e}"),
lsn: self.lsn,
}
})?;

View File

@@ -1199,7 +1199,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1212,7 +1212,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1223,7 +1223,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_FPI => "XLOG FPI",
pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
_ => {
unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
unknown_str = format!("XLOG UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1231,7 +1231,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
rmid => {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
unknown_str = format!("UNKNOWN_RM_{rmid} INFO_0x{info:02x}");
&unknown_str
}
};

View File

@@ -34,7 +34,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
let cfg = Conf {
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
datadir: top_path.join(format!("test_output/{test_name}-{PG_MAJORVERSION}")),
};
if cfg.datadir.exists() {
fs::remove_dir_all(&cfg.datadir).unwrap();

View File

@@ -0,0 +1,11 @@
[package]
name = "postgres_ffi_types"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
thiserror.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]

View File

@@ -0,0 +1,8 @@
//! Misc constants, copied from PostgreSQL headers.
//!
//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change
//! in the future either!
// From pg_tablespace_d.h
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;

View File

@@ -0,0 +1,36 @@
// Fork numbers, from relpath.h
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -0,0 +1,13 @@
//! This package contains some PostgreSQL constants and datatypes that are the same in all versions
//! of PostgreSQL and unlikely to change in the future either. These could be derived from the
//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen
//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for
//! them instead.
//!
//! Be mindful in what you add here, as these types are deeply ingrained in the APIs.
pub mod constants;
pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;

View File

@@ -31,15 +31,15 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e),
Error::Spawn(e) => write!(f, "Error spawning command: {e:?}"),
Error::Failed { status, stderr } => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e),
Error::Other(e) => write!(f, "Error: {:?}", e),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {e:?}"),
Error::Other(e) => write!(f, "Error: {e:?}"),
}
}
}

View File

@@ -36,7 +36,10 @@ impl FeatureResolverBackgroundLoop {
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!("Starting PostHog feature resolver");
tracing::info!(
"Starting PostHog feature resolver with refresh period: {:?}",
refresh_period
);
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
@@ -55,9 +58,16 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
}
tracing::info!("PostHog feature resolver stopped");
}

View File

@@ -39,6 +39,9 @@ pub struct LocalEvaluationResponse {
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
#[allow(dead_code)]
id: u64,
team_id: u64,
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
@@ -107,17 +110,32 @@ impl FeatureStore {
}
}
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
pub fn new_with_flags(
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<Self, &'static str> {
let mut store = Self::new();
store.set_flags(flags);
store
store.set_flags(flags, project_id)?;
Ok(store)
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
pub fn set_flags(
&mut self,
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<(), &'static str> {
self.flags.clear();
for flag in flags {
if let Some(project_id) = project_id {
if flag.team_id != project_id {
return Err(
"Retrieved a spec with different project id, wrong config? Discarding the feature flags.",
);
}
}
self.flags.insert(flag.key.clone(), flag);
}
Ok(())
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
@@ -150,15 +168,13 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
// Left should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a string: {:?}",
provided
"The left side of the condition is not a string: {provided:?}"
)));
};
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
// Right should be a list of string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a list: {:?}",
requested
"The right side of the condition is not a list: {requested:?}"
)));
};
Ok(requested.contains(provided))
@@ -167,14 +183,12 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
// Right should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a string: {:?}",
requested
"The right side of the condition is not a string: {requested:?}"
)));
};
let Ok(requested) = requested.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the right side of the condition as a number: {:?}",
requested
"Can not parse the right side of the condition as a number: {requested:?}"
)));
};
// Left can either be a number or a string
@@ -183,16 +197,14 @@ impl FeatureStore {
PostHogFlagFilterPropertyValue::String(provided) => {
let Ok(provided) = provided.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the left side of the condition as a number: {:?}",
provided
"Can not parse the left side of the condition as a number: {provided:?}"
)));
};
provided
}
_ => {
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a number or a string: {:?}",
provided
"The left side of the condition is not a number or a string: {provided:?}"
)));
}
};
@@ -200,14 +212,12 @@ impl FeatureStore {
"lt" => Ok(provided < requested),
"gt" => Ok(provided > requested),
op => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
op
"Unsupported operator: {op}"
))),
}
}
_ => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
operator
"Unsupported operator: {operator}"
))),
}
}
@@ -355,8 +365,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
let Some(ref multivariate) = flag_config.filters.multivariate else {
@@ -383,8 +392,7 @@ impl FeatureStore {
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
// returned invalid spec, we return an error.
return Err(PostHogEvaluationError::Internal(format!(
"Rollout percentage does not add up to 100: {}",
flag_key
"Rollout percentage does not add up to 100: {flag_key}"
)));
}
GroupEvaluationResult::Unmatched => continue,
@@ -395,8 +403,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -422,8 +429,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
if flag_config.filters.multivariate.is_some() {
@@ -438,8 +444,7 @@ impl FeatureStore {
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(_) => {
return Err(PostHogEvaluationError::Internal(format!(
"Boolean flag cannot have overrides: {}",
flag_key
"Boolean flag cannot have overrides: {flag_key}"
)));
}
GroupEvaluationResult::MatchedAndEvaluate => {
@@ -453,8 +458,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -465,8 +469,7 @@ impl FeatureStore {
Ok(flag_config.filters.multivariate.is_none())
} else {
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -534,6 +537,13 @@ impl PostHogClient {
})
}
/// Check if the server API key is a feature flag secure API key. This key can only be
/// used to fetch the feature flag specs and can only be used on a undocumented API
/// endpoint.
fn is_feature_flag_secure_api_key(&self) -> bool {
self.config.server_api_key.starts_with("phs_")
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
@@ -547,10 +557,22 @@ impl PostHogClient {
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
);
// OR
// BASE_URL/api/feature_flag/local_evaluation/
// with bearer token of feature flag specific self.server_api_key
let url = if self.is_feature_flag_secure_api_key() {
// The new feature local evaluation secure API token
format!(
"{}/api/feature_flag/local_evaluation",
self.config.private_api_url
)
} else {
// The old personal API token
format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
)
};
let response = self
.client
.get(url)
@@ -803,7 +825,7 @@ mod tests {
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant =
@@ -873,7 +895,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
@@ -929,7 +951,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant =

View File

@@ -198,7 +198,7 @@ impl fmt::Display for CancelKeyData {
// This format is more compact and might work better for logs.
f.debug_tuple("CancelKeyData")
.field(&format_args!("{:x}", id))
.field(&format_args!("{id:x}"))
.finish()
}
}
@@ -291,8 +291,7 @@ impl FeMessage {
let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
if len < 4 {
return Err(ProtocolError::Protocol(format!(
"invalid message length {}",
len
"invalid message length {len}"
)));
}
@@ -367,8 +366,7 @@ impl FeStartupPacket {
#[allow(clippy::manual_range_contains)]
if len < 8 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",
len
"invalid startup packet message length {len}"
)));
}

View File

@@ -5,7 +5,7 @@ edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]
base64 = "0.20"
base64.workspace = true
byteorder.workspace = true
bytes.workspace = true
fallible-iterator.workspace = true

View File

@@ -3,6 +3,8 @@
use std::fmt::Write;
use std::{io, iter, mem, str};
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
@@ -226,7 +228,7 @@ impl ScramSha256 {
let (client_key, server_key) = match password {
Credentials::Password(password) => {
let salt = match base64::decode(parsed.salt) {
let salt = match BASE64_STANDARD.decode(parsed.salt) {
Ok(salt) => salt,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};
@@ -255,7 +257,7 @@ impl ScramSha256 {
let mut cbind_input = vec![];
cbind_input.extend(channel_binding.gs2_header().as_bytes());
cbind_input.extend(channel_binding.cbind_data());
let cbind_input = base64::encode(&cbind_input);
let cbind_input = BASE64_STANDARD.encode(&cbind_input);
self.message.clear();
write!(&mut self.message, "c={},r={}", cbind_input, parsed.nonce).unwrap();
@@ -272,7 +274,12 @@ impl ScramSha256 {
*proof ^= signature;
}
write!(&mut self.message, ",p={}", base64::encode(client_proof)).unwrap();
write!(
&mut self.message,
",p={}",
BASE64_STANDARD.encode(client_proof)
)
.unwrap();
self.state = State::Finish {
server_key,
@@ -301,12 +308,12 @@ impl ScramSha256 {
let verifier = match parsed {
ServerFinalMessage::Error(e) => {
return Err(io::Error::other(format!("SCRAM error: {}", e)));
return Err(io::Error::other(format!("SCRAM error: {e}")));
}
ServerFinalMessage::Verifier(verifier) => verifier,
};
let verifier = match base64::decode(verifier) {
let verifier = match BASE64_STANDARD.decode(verifier) {
Ok(verifier) => verifier,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};
@@ -336,10 +343,8 @@ impl<'a> Parser<'a> {
match self.it.next() {
Some((_, c)) if c == target => Ok(()),
Some((i, c)) => {
let m = format!(
"unexpected character at byte {}: expected `{}` but got `{}",
i, target, c
);
let m =
format!("unexpected character at byte {i}: expected `{target}` but got `{c}");
Err(io::Error::new(io::ErrorKind::InvalidInput, m))
}
None => Err(io::Error::new(
@@ -405,7 +410,7 @@ impl<'a> Parser<'a> {
match self.it.peek() {
Some(&(i, _)) => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unexpected trailing data at byte {}", i),
format!("unexpected trailing data at byte {i}"),
)),
None => Ok(()),
}

View File

@@ -211,7 +211,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown authentication tag `{}`", tag),
format!("unknown authentication tag `{tag}`"),
));
}
},
@@ -238,7 +238,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag `{}`", tag),
format!("unknown message tag `{tag}`"),
));
}
};

View File

@@ -6,6 +6,8 @@
//! side. This is good because it ensures the cleartext password won't
//! end up in logs pg_stat displays, etc.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::RngCore;
use sha2::digest::FixedOutput;
@@ -83,8 +85,8 @@ pub(crate) async fn scram_sha_256_salt(
format!(
"SCRAM-SHA-256${}:{}${}:{}",
SCRAM_DEFAULT_ITERATIONS,
base64::encode(salt),
base64::encode(stored_key),
base64::encode(server_key)
BASE64_STANDARD.encode(salt),
BASE64_STANDARD.encode(stored_key),
BASE64_STANDARD.encode(server_key)
)
}

View File

@@ -46,7 +46,7 @@ impl fmt::Display for Type {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.schema() {
"public" | "pg_catalog" => {}
schema => write!(fmt, "{}.", schema)?,
schema => write!(fmt, "{schema}.")?,
}
fmt.write_str(self.name())
}

View File

@@ -1,5 +1,3 @@
use std::io;
use tokio::net::TcpStream;
use crate::client::SocketConfig;
@@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect;
use crate::{Error, cancel_query_raw, connect_socket};
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
config: SocketConfig,
ssl_mode: SslMode,
tls: T,
process_id: i32,
@@ -17,16 +15,6 @@ pub(crate) async fn cancel_query<T>(
where
T: MakeTlsConnect<TcpStream>,
{
let config = match config {
Some(config) => config,
None => {
return Err(Error::connect(io::Error::new(
io::ErrorKind::InvalidInput,
"unknown host",
)));
}
};
let hostname = match &config.host {
Host::Tcp(host) => &**host,
};

View File

@@ -7,11 +7,16 @@ use crate::config::SslMode;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Error, cancel_query, cancel_query_raw};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone, Serialize, Deserialize)]
/// A cancellation token that allows easy cancellation of a query.
#[derive(Clone)]
pub struct CancelToken {
pub socket_config: Option<SocketConfig>,
pub socket_config: SocketConfig,
pub raw: RawCancelToken,
}
/// A raw cancellation token that allows cancellation of a query, given a fresh connection to postgres.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawCancelToken {
pub ssl_mode: SslMode,
pub process_id: i32,
pub secret_key: i32,
@@ -36,14 +41,16 @@ impl CancelToken {
{
cancel_query::cancel_query(
self.socket_config.clone(),
self.ssl_mode,
self.raw.ssl_mode,
tls,
self.process_id,
self.secret_key,
self.raw.process_id,
self.raw.secret_key,
)
.await
}
}
impl RawCancelToken {
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself.
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>

View File

@@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::cancel_token::RawCancelToken;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::{Host, SslMode};
use crate::query::RowStream;
@@ -331,10 +332,12 @@ impl Client {
/// connection associated with this client.
pub fn cancel_token(&self) -> CancelToken {
CancelToken {
socket_config: Some(self.socket_config.clone()),
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
socket_config: self.socket_config.clone(),
raw: RawCancelToken {
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
},
}
}

View File

@@ -332,10 +332,10 @@ impl fmt::Display for DbError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}: {}", self.severity, self.message)?;
if let Some(detail) = &self.detail {
write!(fmt, "\nDETAIL: {}", detail)?;
write!(fmt, "\nDETAIL: {detail}")?;
}
if let Some(hint) = &self.hint {
write!(fmt, "\nHINT: {}", hint)?;
write!(fmt, "\nHINT: {hint}")?;
}
Ok(())
}
@@ -398,9 +398,9 @@ impl fmt::Display for Error {
Kind::Io => fmt.write_str("error communicating with the server")?,
Kind::UnexpectedMessage => fmt.write_str("unexpected message from server")?,
Kind::Tls => fmt.write_str("error performing TLS handshake")?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {}", idx)?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {}", idx)?,
Kind::Column(column) => write!(fmt, "invalid column `{}`", column)?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {idx}")?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {idx}")?,
Kind::Column(column) => write!(fmt, "invalid column `{column}`")?,
Kind::Closed => fmt.write_str("connection closed")?,
Kind::Db => fmt.write_str("db error")?,
Kind::Parse => fmt.write_str("error parsing response from server")?,
@@ -411,7 +411,7 @@ impl fmt::Display for Error {
Kind::Timeout => fmt.write_str("timeout waiting for server")?,
};
if let Some(ref cause) = self.0.cause {
write!(fmt, ": {}", cause)?;
write!(fmt, ": {cause}")?;
}
Ok(())
}

View File

@@ -3,7 +3,7 @@
use postgres_protocol2::message::backend::ReadyForQueryBody;
pub use crate::cancel_token::CancelToken;
pub use crate::cancel_token::{CancelToken, RawCancelToken};
pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
pub use crate::connect_raw::RawConnection;

Some files were not shown because too many files have changed in this diff Show More