Compare commits

..

121 Commits

Author SHA1 Message Date
Erik Grinaker
f6761760a2 Documentation and tweaks 2025-07-01 17:54:41 +02:00
Erik Grinaker
0bce818d5e Add stream pool 2025-07-01 17:54:41 +02:00
Erik Grinaker
48be1da6ef Add initial client pool 2025-07-01 17:54:41 +02:00
Erik Grinaker
d2efc80e40 Add initial ChannelPool 2025-07-01 17:54:41 +02:00
Erik Grinaker
958c2577f5 pageserver: tighten up page_api::Client 2025-07-01 17:54:41 +02:00
Heikki Linnakangas
175c2e11e3 Add assertions that the legacy relsize cache is not used with new communicator
And fix a few cases where it was being called
2025-07-01 16:44:25 +03:00
Heikki Linnakangas
efdb07e7b6 Implement function to check if page is in local cache
This is needed for read replicas. There's one more TODO that needs to
implemented before read replicas work though, in
neon_extend_rel_size()
2025-07-01 16:22:51 +03:00
Heikki Linnakangas
b0970b415c Don't call legacy lfc function when new communicator is used 2025-07-01 15:47:26 +03:00
Heikki Linnakangas
7429dd711c fix the .metrics.socket filename in the ignore list 2025-06-30 23:41:09 +03:00
Heikki Linnakangas
88ac1e356b Ignore the metrics unix domain socket in tests 2025-06-30 23:39:01 +03:00
Erik Grinaker
c3cb1ab98d Merge branch 'main' into communicator-rewrite 2025-06-30 21:07:01 +02:00
Erik Grinaker
81ac4ef43a Add a generic pool prototype 2025-06-30 14:49:34 +02:00
Erik Grinaker
a5b0fc560c Fix/allow remaining clippy lints 2025-06-30 12:36:20 +02:00
Erik Grinaker
67b04f8ab3 Fix a bunch of linter warnings 2025-06-30 11:10:02 +02:00
Erik Grinaker
9d9e3cd08a Fix test_normal_work grpc param 2025-06-30 10:13:46 +02:00
Heikki Linnakangas
97a8f4ef85 Handle unexpected EOF while doing an LFC read more gracefully
There's a bug somewhere because this happens in python regression
tests. We need to hunt that down, but in any case, let's not get stuck
in an infinite loop if it happens.
2025-06-30 00:59:53 +03:00
Heikki Linnakangas
39f31957e3 Handle pageserver response with different number of pages gracefully
Some tests are hitting this case, where pageserver returns 0 page
images in the response to a GetPage request. I suspect it's because
the code doesn't handle sharding correclty? In any case, let's not
panic on it, but return an IO error to the originating backend.
2025-06-29 23:44:28 +03:00
Heikki Linnakangas
924c6a6fdf Fix handling the case that server closes the stream
- avoid panic by checking for Ok(None) response from
  tonic::Streaming::message() instead of just using unwrap()
- There was a race condition, if the caller sent the message, but the
  receiver task concurrently received Ok(None) indicating the stream
  was closed. (I didn't see that in action, but I think it could happen
  by reading the code)
2025-06-29 22:53:39 +03:00
Heikki Linnakangas
7020476bf5 Run cargo fmt 2025-06-29 22:53:09 +03:00
Heikki Linnakangas
80e948db93 Remove ununused mock factory
After reading the code a few times, I didn't quite understand what it
was, to be honest, or how it was going to be used. Remove it now to
reduce noise, but we can resurrect it from git history if we need it
in the future.
2025-06-29 22:52:48 +03:00
Heikki Linnakangas
bfb30d434c minor code tidy-up 2025-06-29 22:51:34 +03:00
Heikki Linnakangas
f3ba201800 Run cargo fmt 2025-06-29 21:21:07 +03:00
Heikki Linnakangas
8b7796cbfa wip 2025-06-29 21:20:48 +03:00
Heikki Linnakangas
fdc7e9c2a4 Extract repeated code to look up RequestTracker into a helper function 2025-06-29 21:20:14 +03:00
Heikki Linnakangas
a352d290eb Plumb through both libpq and grpc connection strings to the compute
Add a new 'pageserver_connection_info' field in the compute spec. It
replaces the old 'pageserver_connstring' field with a more complicated
struct that includes both libpq and grpc URLs, for each shard (or only
one of the the URLs, depending on the configuration). It also includes
a flag suggesting which one to use; compute_ctl now uses it to decide
which protocol to use for the basebackup.

This is compatible with everything that's in production, because the
control plane never used the 'pageserver_connstring' field. That was
added a long time ago with the idea that it would replace the code
that digs the 'neon.pageserver_connstring' GUC from the list of
Postgres settings, but we never got around to do that in the control
plane. Hence, it was only used with neon_local. But the plan now is to
pass the 'pageserver_connection_info' from the control plane, and once
that's fully deployed everywhere, the code to parse
'neon.pageserver_connstring' in compute_ctl can be removed.

The 'grpc' flag on an endpoint in endpoint config is now more of a
suggestion. Compute_ctl gets both URLs, so it can choose to use libpq
or grpc as it wishes. It currently always obeys the 'prefer_grpc' flag
that's part of the connection info though. Postgres however uses grpc
iff the new rust-based communicator is enabled.

TODO/plan for the control plane:

- Start to pass `pageserver_connection_info` in the spec file.
- Also keep the current `neon.pageserver_connstring` setting for now,
  for backwards compatibility with old computes

After that, the `pageserver_connection_info.prefer_grpc` flag in the
spec file can be used to control whether compute_ctl uses grpc or
libpq.  The actual compute's grpc usage will be controlled by the
`neon.enable_new_communicator` GUC. It can be set separately from
'prefer_grpc'.

Later:

- Once all old computes are gone, remove the code to pass
  `neon.pageserver_connstring`
2025-06-29 18:16:49 +03:00
Heikki Linnakangas
8c122a1c98 Don't call into the old LFC when using the new communicator
This fixes errors like `index "pg_class_relname_nsp_index" contains
unexpected zero page at block 2` when running the python tests

smgrzeroextend() still called into the old LFC's lfc_write() function,
even when using the new communicator, which zeroed some arbitrary
pages in the LFC file, overwriting pages managed by the new LFC
implementation managed by `integrated_cache.rs`
2025-06-29 17:40:46 +03:00
Erik Grinaker
e3ecdfbecc pgxn/neon: actually use UNAME_S 2025-06-26 12:38:44 +02:00
Erik Grinaker
d08e553835 pgxn/neon: fix callback_get_request_lsn_unsafe return type 2025-06-26 12:33:59 +02:00
Erik Grinaker
7fffb5b4df pgxn/neon: fix macOS build 2025-06-26 12:33:39 +02:00
Heikki Linnakangas
46b5c0be0b Remove duplicated migration script
I messed this up during the merge I guess?
2025-06-23 19:46:32 +03:00
Heikki Linnakangas
2d913ff125 fix some mismerges 2025-06-23 18:21:16 +03:00
Heikki Linnakangas
e90be06d46 silence a few compiler warnings
about unnecessary 'mut's and 'use's
2025-06-23 18:16:54 +03:00
Heikki Linnakangas
356ba67607 Merge remote-tracking branch 'origin/main' into HEAD
I also included build script changes from
https://github.com/neondatabase/neon/pull/12266, which is not yet
merged but will be soon.
2025-06-23 17:46:30 +03:00
Heikki Linnakangas
1847f4de54 Add missing #include.
Got a warning on macos without this
2025-06-18 17:26:20 +03:00
Heikki Linnakangas
e8af3a2811 remove unused struct in example code, to silence compiler warning 2025-06-17 02:09:21 +03:00
Heikki Linnakangas
b603e3dddb Silence compiler warnings in example code 2025-06-17 02:07:33 +03:00
Heikki Linnakangas
83007782fd fix compilation of example 2025-06-17 02:07:15 +03:00
Erik Grinaker
782062014e Fix test_normal_work endpoint restart 2025-06-16 10:16:27 +02:00
Erik Grinaker
d0b3629412 Tweak base backups 2025-06-13 13:47:26 -07:00
Erik Grinaker
f4d51c0f5c Use gRPC for test_normal_work 2025-06-09 22:51:15 +02:00
Erik Grinaker
ec17ae0658 Handle gRPC basebackups in compute_ctl 2025-06-09 22:50:57 +02:00
Erik Grinaker
9ecce60ded Plumb gRPC addr through storage-controller 2025-06-09 20:24:18 +02:00
Erik Grinaker
e74a957045 test_runner: initial gRPC protocol support 2025-06-06 16:56:33 +02:00
Erik Grinaker
396a16a3b2 test_runner: enable gRPC Pageserver 2025-06-06 14:55:29 +02:00
Elizabeth Murray
7140a50225 Minor changes to get integration tests to run for communicator. 2025-06-06 04:32:51 +02:00
Elizabeth Murray
68f18ccacf Request Tracker Prototype
Does not include splitting requests across shards.
2025-06-05 13:32:18 -07:00
Heikki Linnakangas
786888d93f Instead of a fixed TCP port for metrics, listen on a unix domain socket
That avoids clashes if you run two computes at the same time. More
secure too. We might want to have a TCP port in the long run, but this
is less trouble for now.

To see the metrics with curl you can use:

    curl --unix-socket .neon/endpoints/ep-main/pgdata/.metrics.socket http://localhost/metrics
2025-06-05 21:28:11 +03:00
Heikki Linnakangas
255537dda1 avoid hitting assertion failure in MarkPostmasterChildWalSender() 2025-06-05 20:08:32 +03:00
Erik Grinaker
8b494f6a24 Ignore communicator_bindings.h 2025-06-05 17:52:50 +02:00
Erik Grinaker
28a61741b3 Mangle gRPC connstrings to use port 51051 2025-06-05 17:46:58 +02:00
Erik Grinaker
2fb6164bf8 Misc build fixes 2025-06-05 17:22:11 +02:00
Erik Grinaker
328f28dfe5 impl Default for SlabBlockHeader 2025-06-05 17:18:28 +02:00
Erik Grinaker
95838056da Fix RelTag fields 2025-06-05 17:13:51 +02:00
Erik Grinaker
6d451654f1 Remove generated communicator_bindings.h 2025-06-05 17:12:13 +02:00
Erik Grinaker
37c58522a2 Merge branch 'main' into communicator-rewrite 2025-06-05 15:08:05 +02:00
Erik Grinaker
4b6f02e47d Merge branch 'main' into communicator-rewrite 2025-06-04 10:23:29 +02:00
Erik Grinaker
8202c6172f Merge branch 'main' into communicator-rewrite 2025-06-03 16:04:31 +02:00
Erik Grinaker
69a47d789d pageserver: remove gRPC compute service prototype 2025-06-03 13:47:21 +02:00
Erik Grinaker
b36f880710 Fix Linux build failures 2025-06-03 13:37:56 +02:00
Erik Grinaker
745b750f33 Merge branch 'main' into communicator-rewrite 2025-06-03 13:29:45 +02:00
Heikki Linnakangas
f06bb2bbd8 Implement growing the hash table. Fix unit tests. 2025-05-29 15:54:55 +03:00
Heikki Linnakangas
b3c25418a6 Add metrics to track memory usage of the rust communicator 2025-05-29 02:14:01 +03:00
Heikki Linnakangas
33549bad1d use separate hash tables for relsize cache and block mappings 2025-05-28 23:57:55 +03:00
Heikki Linnakangas
009168d711 Add placeholder shmem hashmap implementation
Use that instead of the half-baked Adaptive Radix Tree
implementation. ART would probably be better in the long run, but more
complicated to implement.
2025-05-28 11:08:35 +03:00
Elizabeth Murray
7c9bd542a6 Fix compile warnings, minor cleanup. 2025-05-26 06:30:48 -07:00
Elizabeth Murray
014823b305 Add a new iteration of a new client pool with some updates. 2025-05-26 05:29:32 -07:00
Elizabeth Murray
af9379ccf6 Use a sempahore to gate access to connections. Add metrics for testing. 2025-05-26 05:28:50 -07:00
Heikki Linnakangas
bb28109ffa Merge remote-tracking branch 'origin/main' into communicator-rewrite-with-integrated-cache
There were conflicts because of the differences in the page_api
protocol that was merged to main vs what was on the branch. I adapted
the code for the protocol in main.
2025-05-26 11:52:32 +03:00
Elizabeth Murray
60a0bec1c0 Set default max consumers per connection to a high number. 2025-05-19 07:00:39 -07:00
Elizabeth Murray
31fa7a545d Remove unnecessary info include now that the info message is gone. 2025-05-19 06:52:07 -07:00
Elizabeth Murray
ac464c5f2c Return info message that was used for debugging. 2025-05-19 06:39:16 -07:00
Elizabeth Murray
0dddb1e373 Add back whitespace that was removed. 2025-05-19 06:34:52 -07:00
Elizabeth Murray
3acb263e62 Add first iteration of simulating a flakey network with a custom TCP. 2025-05-19 06:33:30 -07:00
Elizabeth Murray
1e83398cdd Correct out-of-date comment. 2025-05-14 07:31:52 -07:00
Elizabeth Murray
be8ed81532 Connection pool: update error accounting, sweep idle connections, add config options. 2025-05-14 07:31:52 -07:00
Heikki Linnakangas
12b08c4b82 Fix shutdown 2025-05-14 01:49:55 +03:00
Heikki Linnakangas
827358dd03 Handle OOMs a little more gracefully 2025-05-12 23:33:22 +03:00
Heikki Linnakangas
d367273000 minor cleanup 2025-05-12 23:11:55 +03:00
Heikki Linnakangas
e2bad5d9e9 Add debugging HTTP endpoint for dumping the cache tree 2025-05-12 22:54:03 +03:00
Heikki Linnakangas
5623e4665b bunch of fixes 2025-05-12 18:40:54 +03:00
Heikki Linnakangas
8abb4dab6d implement shrinking nodes 2025-05-12 03:57:10 +03:00
Heikki Linnakangas
731667ac37 better metrics of the art tree 2025-05-12 02:08:51 +03:00
Heikki Linnakangas
6a1374d106 Pack tree node structs more tightly, avoiding alignment padding 2025-05-12 01:01:58 +03:00
Heikki Linnakangas
f7c908f2f0 more metrics 2025-05-12 01:01:50 +03:00
Heikki Linnakangas
86671e3a0b Add a bunch of metric counters 2025-05-11 20:11:13 +03:00
Heikki Linnakangas
319cd74f73 Fix eviction 2025-05-11 19:34:50 +03:00
Heikki Linnakangas
0efefbf77c Add a few metrics, fix page eviction 2025-05-10 03:13:28 +03:00
Heikki Linnakangas
e6a4171fa1 fix concurrency issues with the LFC
- Add another locking hash table to track which cached pages are currently being
  modified, by smgrwrite() or smgrread() or by prefetch.

- Use single-value Leaf pages in the art tree. That seems simpler after all,
  and it eliminates some corner cases where a Value needed to be cloned, which
  made it tricky to use atomics or other interior mutability on the Values
2025-05-10 02:36:48 +03:00
Heikki Linnakangas
0c25ea9e31 reduce LOG noise 2025-05-09 18:27:36 +03:00
Heikki Linnakangas
6692321026 Remove dependency on io_uring, use plain std::fs ops instead
io_uring is a great idea in the long term, but for now, let's make it
easier to develop locally on macos, where io_uring is not available.
2025-05-06 17:46:21 +03:00
Heikki Linnakangas
791df28755 Linked list fix and add unit test 2025-05-06 16:46:54 +03:00
Heikki Linnakangas
d20da994f4 git add missing file 2025-05-06 15:36:48 +03:00
Heikki Linnakangas
6dbbdaae73 run 'cargo fmt' 2025-05-06 15:35:56 +03:00
Heikki Linnakangas
977bc09d2a Bunch of fixes, smarter iterator, metrics exporter 2025-05-06 15:28:50 +03:00
Heikki Linnakangas
44269fcd5e Implement simple eviction and free block tracking 2025-05-06 15:28:15 +03:00
Heikki Linnakangas
44cc648dc8 Implement iterator over keys
the implementation is not very optimized, but probably good enough for an MVP
2025-05-06 15:27:38 +03:00
Heikki Linnakangas
884e028a4a implement deletion in art tree 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
42df3e5453 debugging stats 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
fc743e284f more work on allocators 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
d02f9a2139 Collect garbage, handle OOMs 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
083118e98e Implement epoch system 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
54cd2272f1 more memory allocation stuff 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
e40193e3c8 simple block-based allocator 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
ce9f7bacc1 Fix communicator client for recent changes in protocol and client code 2025-05-06 15:26:51 +03:00
Heikki Linnakangas
b7891f8fe8 Include 'neon-shard-id' header in client requests 2025-05-06 15:23:30 +03:00
Elizabeth Murray
5f2adaa9ad Remove some additional debug info messages. 2025-05-02 10:50:53 -07:00
Elizabeth Murray
3e5e396c8d Remove some debug info messages. 2025-05-02 10:24:18 -07:00
Elizabeth Murray
9d781c6fda Add a connection pool module to the grpc client. 2025-05-02 10:22:33 -07:00
Erik Grinaker
cf5d038472 service documentation 2025-05-02 15:20:12 +02:00
Erik Grinaker
d785100c02 page_api: add GetPageRequest::class 2025-05-02 10:48:32 +02:00
Erik Grinaker
2c0d930e3d page_api: add GetPageResponse::status 2025-04-30 16:48:45 +02:00
Erik Grinaker
66171a117b page_api: add GetPageRequestBatch 2025-04-30 15:31:11 +02:00
Erik Grinaker
df2806e7a0 page_api: add GetPageRequest::id 2025-04-30 15:00:16 +02:00
Erik Grinaker
07631692db page_api: protobuf comments 2025-04-30 12:36:11 +02:00
Erik Grinaker
4c77397943 Add neon-shard-id header 2025-04-30 11:18:06 +02:00
Erik Grinaker
7bb58be546 Use authorization header instead of neon-auth-token 2025-04-30 10:38:44 +02:00
Erik Grinaker
b5373de208 page_api: add get_slru_segment() 2025-04-29 17:59:27 +02:00
Erik Grinaker
b86c610f42 page_api: tweaks 2025-04-29 17:23:51 +02:00
Erik Grinaker
0f520d79ab pageserver: rename data_api to page_api 2025-04-29 15:58:52 +02:00
Heikki Linnakangas
93eb7bb6b8 include lots of changes that went missing by accident 2025-04-29 15:32:27 +03:00
Heikki Linnakangas
e58d0fece1 New communicator, with "integrated" cache accessible from all processes 2025-04-29 11:52:44 +03:00
611 changed files with 23069 additions and 37170 deletions

View File

@@ -21,20 +21,18 @@ platforms = [
# "x86_64-apple-darwin",
# "x86_64-pc-windows-msvc",
]
[final-excludes]
workspace-members = [
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
"vm_monitor",
# subzero-core is a stub crate that should be excluded from workspace-hack
"subzero-core",
# All of these exist in libs and are not usually built independently.
# Putting workspace hack there adds a bottleneck for cargo builds.
"compute_api",
"consumption_metrics",
"desim",
"json",
"metrics",
"pageserver_api",
"postgres_backend",

View File

@@ -27,4 +27,4 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build-tools/patches
!build_tools/patches

View File

@@ -4,11 +4,9 @@ self-hosted-runner:
- large
- large-arm64
- small
- small-debug
- small-metal
- small-arm64
- unit-perf
- unit-perf-aws-arm
- us-east-2
config-variables:
- AWS_ECR_REGION
@@ -32,7 +30,6 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PROJECT_ID
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -1,28 +0,0 @@
name: 'Prepare current job for subzero'
description: >
Set git token to access `neondatabase/subzero` from cargo build,
and set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable to use git CLI
inputs:
token:
description: 'GitHub token with access to neondatabase/subzero'
required: true
runs:
using: "composite"
steps:
- name: Set git token for neondatabase/subzero
uses: pyTooling/Actions/with-post-step@2307b526df64d55e95884e072e49aac2a00a9afa # v5.1.0
env:
SUBZERO_ACCESS_TOKEN: ${{ inputs.token }}
with:
main: |
git config --global url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a
post: |
git config --global --unset url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
- name: Set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable
shell: bash -euxo pipefail {0}
run: echo "CARGO_NET_GIT_FETCH_WITH_CLI=true" >> ${GITHUB_ENV}

View File

@@ -176,13 +176,7 @@ runs:
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
# We don't use code coverage for regression tests (the step is disabled),
# so there's no need to collect it.
# Ref https://github.com/neondatabase/neon/issues/4540
# cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=()
# Explicitly set LLVM_PROFILE_FILE to /dev/null to avoid writing *.profraw files
export LLVM_PROFILE_FILE=/dev/null
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
else
cov_prefix=()
fi

View File

@@ -86,10 +86,6 @@ jobs:
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
@@ -120,7 +116,7 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FLAGS="--locked --features testing,rest_broker"
CARGO_FLAGS="--locked --features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_PROFILE=""
@@ -154,7 +150,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
@@ -166,7 +162,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
@@ -178,7 +174,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
@@ -190,7 +186,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
@@ -338,3 +334,67 @@ jobs:
- name: Merge and upload coverage data
if: inputs.build-type == 'debug'
uses: ./.github/actions/save-coverage-data
regress-tests:
# Don't run regression tests on debug arm64 builds
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
permissions:
id-token: write # aws-actions/configure-aws-credentials
contents: read
statuses: write
needs: [ build-neon ]
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-metal')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# for changed limits, see comments on `options:` earlier in this file
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
fail-fast: false
matrix: ${{ fromJSON(format('{{"include":{0}}}', inputs.test-cfg)) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.rerun-failed }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540
- name: Merge and upload coverage data
if: |
false &&
inputs.build-type == 'debug' && matrix.pg_version == 'v16'
uses: ./.github/actions/save-coverage-data

View File

@@ -46,10 +46,6 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Cache cargo deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0

View File

@@ -1,384 +0,0 @@
name: TPC-C like benchmark using benchbase
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 6 * * *' # run once a day at 6 AM UTC
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we do not want to be too noisy in production environment
group: benchbase-tpcc-workflow
cancel-in-progress: false
permissions:
contents: read
jobs:
benchbase-tpcc:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 2
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 2
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PG_CONFIG: /tmp/neon/pg_install/v17/bin/pg_config
PSQL: /tmp/neon/pg_install/v17/bin/psql
PG_17_LIB_PATH: /tmp/neon/pg_install/v17/lib
POSTGRES_VERSION: 17
runs-on: [ self-hosted, us-east-2, x64 ]
timeout-minutes: 1440
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project-tpcc
uses: ./.github/actions/neon-project-create
with:
region_id: aws-us-east-2
postgres_version: ${{ env.POSTGRES_VERSION }}
compute_units: '[${{ matrix.min_cu }}, ${{ matrix.max_cu }}]'
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)
- name: Initialize Neon project
env:
BENCHMARK_TPCC_CONNSTR: ${{ steps.create-neon-project-tpcc.outputs.dsn }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
run: |
echo "Initializing Neon project with project_id: ${PROJECT_ID}"
export LD_LIBRARY_PATH=${PG_17_LIB_PATH}
# Retry logic for psql connection with 1 minute sleep between attempts
for attempt in {1..3}; do
echo "Attempt ${attempt}/3: Creating extensions in Neon project"
if ${PSQL} "${BENCHMARK_TPCC_CONNSTR}" -c "CREATE EXTENSION IF NOT EXISTS neon; CREATE EXTENSION IF NOT EXISTS neon_utils;"; then
echo "Successfully created extensions"
break
else
echo "Failed to create extensions on attempt ${attempt}"
if [ ${attempt} -lt 3 ]; then
echo "Waiting 60 seconds before retry..."
sleep 60
else
echo "All attempts failed, exiting"
exit 1
fi
fi
done
echo "BENCHMARK_TPCC_CONNSTR=${BENCHMARK_TPCC_CONNSTR}" >> $GITHUB_ENV
- name: Generate BenchBase workload configuration
env:
WAREHOUSES: ${{ matrix.warehouses }}
MAX_RATE: ${{ matrix.max_rate }}
run: |
echo "Generating BenchBase configs for warehouses: ${WAREHOUSES}, max_rate: ${MAX_RATE}"
# Extract hostname and password from connection string
# Format: postgresql://username:password@hostname/database?params (no port for Neon)
HOSTNAME=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:[^@]*@\([^/]*\)/.*|\1|p')
PASSWORD=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:\([^@]*\)@.*|\1|p')
echo "Extracted hostname: ${HOSTNAME}"
# Use runner temp (NVMe) as working directory
cd "${RUNNER_TEMP}"
# Copy the generator script
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_workload_size.py" .
# Generate configs and scripts
python3 generate_workload_size.py \
--warehouses ${WAREHOUSES} \
--max-rate ${MAX_RATE} \
--hostname ${HOSTNAME} \
--password ${PASSWORD} \
--runner-arch ${{ runner.arch }}
# Fix path mismatch: move generated configs and scripts to expected locations
mv ../configs ./configs
mv ../scripts ./scripts
- name: Prepare database (load data)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Loading ${WAREHOUSES} warehouses into database..."
# Run the loader script and capture output to log file while preserving stdout/stderr
./scripts/load_${WAREHOUSES}_warehouses.sh 2>&1 | tee "load_${WAREHOUSES}_warehouses.log"
echo "Database loading completed"
- name: Run TPC-C benchmark (warmup phase, then benchmark at 70% of configuredmax TPS)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C benchmark with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_opt_rate.sh
echo "Benchmark execution completed"
- name: Run TPC-C benchmark (warmup phase, then ramp down TPS and up again in 5 minute intervals)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C ramp-down-up with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_ramp_up.sh
echo "Benchmark execution completed"
- name: Process results (upload to test results database and generate diagrams)
env:
WAREHOUSES: ${{ matrix.warehouses }}
MIN_CU: ${{ matrix.min_cu }}
MAX_CU: ${{ matrix.max_cu }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
REVISION: ${{ github.sha }}
PERF_DB_CONNSTR: ${{ secrets.PERF_TEST_RESULT_CONNSTR }}
run: |
cd "${RUNNER_TEMP}"
echo "Creating temporary Python environment for results processing..."
# Create temporary virtual environment
python3 -m venv temp_results_env
source temp_results_env/bin/activate
# Install required packages in virtual environment
pip install matplotlib pandas psycopg2-binary
echo "Copying results processing scripts..."
# Copy both processing scripts
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_diagrams.py" .
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/upload_results_to_perf_test_results.py" .
echo "Processing load phase metrics..."
# Find and process load log
LOAD_LOG=$(find . -name "load_${WAREHOUSES}_warehouses.log" -type f | head -1)
if [ -n "$LOAD_LOG" ]; then
echo "Processing load metrics from: $LOAD_LOG"
python upload_results_to_perf_test_results.py \
--load-log "$LOAD_LOG" \
--run-type "load" \
--warehouses "${WAREHOUSES}" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Load log file not found: load_${WAREHOUSES}_warehouses.log"
fi
echo "Processing warmup results for optimal rate..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | head -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | head -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing optimal rate results..."
# Find and process optimal rate results
OPTRATE_CSV=$(find results_opt_rate -name "*.results.csv" -type f | head -1)
OPTRATE_JSON=$(find results_opt_rate -name "*.summary.json" -type f | head -1)
if [ -n "$OPTRATE_CSV" ] && [ -n "$OPTRATE_JSON" ]; then
echo "Generating optimal rate diagram from: $OPTRATE_CSV"
python generate_diagrams.py \
--input-csv "$OPTRATE_CSV" \
--output-svg "benchmark_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "70% of max TPS"
echo "Uploading optimal rate metrics from: $OPTRATE_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$OPTRATE_JSON" \
--results-csv "$OPTRATE_CSV" \
--run-type "opt-rate" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing optimal rate results files (CSV: $OPTRATE_CSV, JSON: $OPTRATE_JSON)"
fi
echo "Processing warmup 2 results for ramp down/up phase..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | tail -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | tail -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_2_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing ramp results..."
# Find and process ramp results
RAMPUP_CSV=$(find results_ramp_up -name "*.results.csv" -type f | head -1)
RAMPUP_JSON=$(find results_ramp_up -name "*.summary.json" -type f | head -1)
if [ -n "$RAMPUP_CSV" ] && [ -n "$RAMPUP_JSON" ]; then
echo "Generating ramp diagram from: $RAMPUP_CSV"
python generate_diagrams.py \
--input-csv "$RAMPUP_CSV" \
--output-svg "ramp_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "ramp TPS down and up in 5 minute intervals"
echo "Uploading ramp metrics from: $RAMPUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$RAMPUP_JSON" \
--results-csv "$RAMPUP_CSV" \
--run-type "ramp-up" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing ramp results files (CSV: $RAMPUP_CSV, JSON: $RAMPUP_JSON)"
fi
# Deactivate and clean up virtual environment
deactivate
rm -rf temp_results_env
rm upload_results_to_perf_test_results.py
echo "Results processing completed and environment cleaned up"
- name: Set date for upload
id: set-date
run: echo "date=$(date +%Y-%m-%d)" >> $GITHUB_OUTPUT
- name: Configure AWS credentials # necessary to upload results
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: us-east-2
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 900 # 900 is minimum value
- name: Upload benchmark results to S3
env:
S3_BUCKET: neon-public-benchmark-results
S3_PREFIX: benchbase-tpc-c/${{ steps.set-date.outputs.date }}/${{ github.run_id }}/${{ matrix.warehouses }}-warehouses
run: |
echo "Redacting passwords from configuration files before upload..."
# Mask all passwords in XML config files
find "${RUNNER_TEMP}/configs" -name "*.xml" -type f -exec sed -i 's|<password>[^<]*</password>|<password>redacted</password>|g' {} \;
echo "Uploading benchmark results to s3://${S3_BUCKET}/${S3_PREFIX}/"
# Upload the entire benchmark directory recursively
aws s3 cp --only-show-errors --recursive "${RUNNER_TEMP}" s3://${S3_BUCKET}/${S3_PREFIX}/
echo "Upload completed"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)

View File

@@ -219,7 +219,6 @@ jobs:
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
--ignore test_runner/performance/test_lfc_prewarm.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -411,77 +410,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
prewarm-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PROJECT_ID: ${{ vars.PREWARM_PROJECT_ID }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run prewarm benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_lfc_prewarm.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

View File

@@ -72,7 +72,7 @@ jobs:
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
IMAGE_TAG: |
${{ hashFiles('build-tools/Dockerfile',
${{ hashFiles('build-tools.Dockerfile',
'.github/workflows/build-build-tools-image.yml') }}
run: |
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
@@ -144,11 +144,9 @@ jobs:
- uses: docker/build-push-action@471d1dc4e07e5cdedd4c2171150001c434f0b7a4 # v6.15.0
with:
file: build-tools/Dockerfile
file: build-tools.Dockerfile
context: .
attests: |
type=provenance,mode=max
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
provenance: false
push: true
pull: true
build-args: |

View File

@@ -32,14 +32,162 @@ permissions:
contents: read
jobs:
make-all:
build-pgxn:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
timeout-minutes: 30
runs-on: macos-15
strategy:
matrix:
postgres-version: ${{ inputs.rebuild_everything && fromJSON('["v14", "v15", "v16", "v17"]') || fromJSON(inputs.pg_versions) }}
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- name: Checkout main repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set pg ${{ matrix.postgres-version }} for caching
id: pg_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-${{ matrix.postgres-version }}) | tee -a "${GITHUB_OUTPUT}"
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Checkout submodule vendor/postgres-${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
git submodule init vendor/postgres-${{ matrix.postgres-version }}
git submodule update --depth 1 --recursive
- name: Install build dependencies
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Build Postgres ${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make postgres-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
- name: Build Neon Pg Ext ${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make "neon-pg-ext-${{ matrix.postgres-version }}" -j$(sysctl -n hw.ncpu)
- name: Upload "pg_install/${{ matrix.postgres-version }}" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: pg_install--${{ matrix.postgres-version }}
path: pg_install/${{ matrix.postgres-version }}
# The artifact is supposed to be used by the next job in the same workflow,
# so theres no need to store it for too long.
retention-days: 1
build-walproposer-lib:
if: |
contains(inputs.pg_versions, 'v17') || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- name: Checkout main repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set pg v17 for caching
id: pg_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
- name: Download "pg_install/v17" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: pg_install--v17
path: pg_install/v17
# `actions/download-artifact` doesn't preserve permissions:
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss
- name: Make pg_install/v*/bin/* executable
run: |
chmod +x pg_install/v*/bin/*
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Checkout submodule vendor/postgres-v17
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
git submodule init vendor/postgres-v17
git submodule update --depth 1 --recursive
- name: Install build dependencies
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Build walproposer-lib (only for v17)
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run:
make walproposer-lib -j$(sysctl -n hw.ncpu) PG_INSTALL_CACHED=1
- name: Upload "build/walproposer-lib" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: build--walproposer-lib
path: build/walproposer-lib
# The artifact is supposed to be used by the next job in the same workflow,
# so theres no need to store it for too long.
retention-days: 1
cargo-build:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_rust_code || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
timeout-minutes: 60
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn, build-walproposer-lib]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
@@ -54,58 +202,42 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
- name: Download "pg_install/v14" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
name: pg_install--v14
path: pg_install/v14
- name: Install build dependencies
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Restore "pg_install/" cache
id: cache_pg
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
- name: Download "pg_install/v15" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
path: pg_install
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-install-v14-${{ hashFiles('Makefile', 'postgres.mk', 'vendor/revisions.json') }}
name: pg_install--v15
path: pg_install/v15
- name: Checkout vendor/postgres submodules
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
git submodule init
git submodule update --depth 1 --recursive
- name: Download "pg_install/v16" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: pg_install--v16
path: pg_install/v16
- name: Build Postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make postgres -j$(sysctl -n hw.ncpu)
- name: Download "pg_install/v17" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: pg_install--v17
path: pg_install/v17
# This isn't strictly necessary, but it makes the cached and non-cached builds more similar,
# When pg_install is restored from cache, there is no 'build/' directory. By removing it
# in a non-cached build too, we enforce that the rest of the steps don't depend on it,
# so that we notice any build caching bugs earlier.
- name: Remove build artifacts
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
rm -rf build
- name: Download "build/walproposer-lib" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: build--walproposer-lib
path: build/walproposer-lib
# Explicitly update the rust toolchain before running 'make'. The parallel make build can
# invoke 'cargo build' more than once in parallel, for different crates. That's OK, 'cargo'
# does its own locking to prevent concurrent builds from stepping on each other's
# toes. However, it will first try to update the toolchain, and that step is not locked the
# same way. To avoid two toolchain updates running in parallel and stepping on each other's
# toes, ensure that the toolchain is up-to-date beforehand.
- name: Update rust toolchain
# `actions/download-artifact` doesn't preserve permissions:
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss
- name: Make pg_install/v*/bin/* executable
run: |
rustup --version &&
rustup update &&
rustup show
chmod +x pg_install/v*/bin/*
- name: Cache cargo deps
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
@@ -117,12 +249,17 @@ jobs:
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
# Build the neon-specific postgres extensions, and all the Rust bits.
#
# Pass PG_INSTALL_CACHED=1 because PostgreSQL was already built and cached
# separately.
- name: Build all
run: PG_INSTALL_CACHED=1 BUILD_TYPE=release make -j$(sysctl -n hw.ncpu) all
- name: Install build dependencies
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build
run: cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced
run: ./run_clippy.sh

File diff suppressed because it is too large Load Diff

View File

@@ -72,7 +72,6 @@ jobs:
check-macos-build:
needs: [ check-permissions, files-changed ]
uses: ./.github/workflows/build-macos.yml
secrets: inherit
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ fromJSON(needs.files-changed.outputs.rebuild_rust_code) }}

View File

@@ -1,4 +1,4 @@
name: Periodic pagebench performance test on unit-perf-aws-arm runners
name: Periodic pagebench performance test on unit-perf hetzner runner
on:
schedule:
@@ -40,7 +40,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
runs-on: [ self-hosted, unit-perf ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:

View File

@@ -48,20 +48,8 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image, generate-ch-tmppw ]
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
@@ -72,21 +60,16 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:25.6
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
PGSSLCERT: /tmp/postgresql.crt
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -96,7 +79,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -142,7 +125,6 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

View File

@@ -1,9 +1,9 @@
name: Periodic proxy performance test on unit-perf-aws-arm runners
name: Periodic proxy performance test on unit-perf hetzner runner
on:
push: # TODO: remove after testing
branches:
- test-proxy-bench # Runs on pushes to test-proxy-bench branch
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
# schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
@@ -32,7 +32,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
runs-on: [self-hosted, unit-perf]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
@@ -55,58 +55,30 @@ jobs:
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "NEON_PROXY_PATH=${RUNNER_TEMP}/neon/bin/proxy"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo ""
} >> "$GITHUB_ENV"
- name: Cache poetry deps
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: show ulimits
shell: bash -euxo pipefail {0}
run: |
ulimit -a
- name: Run proxy-bench
working-directory: ${{ env.PROXY_BENCH_PATH }}
run: ./run.sh --with-grafana --bare-metal
run: ${PROXY_BENCH_PATH}/run.sh
- name: Ingest Bench Results
- name: Ingest Bench Results # neon repo script
if: always()
working-directory: ${{ env.NEON_DIR }}
run: |
mkdir -p $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
shell: bash -euxo pipefail {0}
if: always()
env:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
REPORT_FROM: $TEST_OUTPUT
working-directory: ${{ env.NEON_DIR }}
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
- name: Docker cleanup
if: always()
run: docker compose down
- name: Notify Failure
if: failure()
run: echo "Proxy bench job failed" && exit 1
- name: Cleanup Test Resources
if: always()
shell: bash -euxo pipefail {0}
run: |
# Cleanup the test resources
if [[ -d "${TEST_OUTPUT}" ]]; then
rm -rf ${TEST_OUTPUT}
fi
if [[ -d "${PROXY_BENCH_PATH}/test_output" ]]; then
rm -rf ${PROXY_BENCH_PATH}/test_output
fi
run: echo "Proxy bench job failed" && exit 1

11
.gitignore vendored
View File

@@ -6,7 +6,6 @@
/tmp_check_cli
__pycache__/
test_output/
neon_previous/
.vscode
.idea
*.swp
@@ -15,7 +14,7 @@ neon.iml
/.neon
/integration_tests/.neon
compaction-suite-results.*
docker-compose/docker-compose-parallel.yml
pgxn/neon/communicator/communicator_bindings.h
# Coverage
*.profraw
@@ -26,14 +25,6 @@ docker-compose/docker-compose-parallel.yml
*.o
*.so
*.Po
*.pid
# pgindent typedef lists
*.list
# Node
**/node_modules/
# various files for local testing
/proxy/.subzero
local_proxy.json

8
.gitmodules vendored
View File

@@ -1,16 +1,16 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_14_STABLE_neon
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_17_STABLE_neon

867
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,6 +35,7 @@ members = [
"libs/pq_proto",
"libs/tenant_size_model",
"libs/metrics",
"libs/neonart",
"libs/postgres_connection",
"libs/remote_storage",
"libs/tracing-utils",
@@ -43,10 +44,8 @@ members = [
"libs/walproposer",
"libs/wal_decoder",
"libs/postgres_initdb",
"libs/proxy/json",
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/subzero_core",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"pgxn/neon/communicator",
@@ -93,6 +92,7 @@ clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crossbeam-utils = "0.8.21"
crc32c = "0.6"
diatomic-waker = { version = "0.2.3" }
either = "1.8"
@@ -131,11 +131,10 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
lock_api = "0.4.13"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
moka = { version = "0.12", features = ["sync"] }
memoffset = "0.9"
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
@@ -143,33 +142,32 @@ notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-blocking-client"] }
opentelemetry-semantic-conventions = "0.30"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pem = "3.0.3"
peekable = "0.3.0"
pin-project-lite = "0.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer", "prost-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.9"
# Remove after p256 is updated to 0.14.
rand_core = "=0.6"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
routerify = "3"
rpds = "0.13"
rustc-hash = "2.1.1"
rustc-hash = "1.1.0"
rustls = { version = "0.23.16", default-features = false }
rustls-pemfile = "2"
rustls-pki-types = "1.11"
@@ -190,6 +188,7 @@ smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
spki = "0.7.3"
spin = "0.9.8"
strum = "0.26"
strum_macros = "0.26"
"subtle" = "2.5.0"
@@ -201,11 +200,10 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
toml = "0.8"
@@ -214,15 +212,17 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", "
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
tower-otel = { version = "0.6", features = ["axum"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.31"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
tracing-appender = "0.2.3"
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
@@ -233,15 +233,17 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.8"
whoami = "1.5.1"
zerocopy = { version = "0.8", features = ["derive", "simd"] }
json-structural-diff = { version = "0.2.0" }
x509-cert = { version = "0.2.5" }
zerocopy = { version = "0.8", features = ["derive", "simd"] }
zeroize = "1.8"
## TODO replace this with tracing
env_logger = "0.11"
log = "0.4"
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
uring-common = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
@@ -261,6 +263,7 @@ desim = { version = "0.1", path = "./libs/desim" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neonart = { version = "0.1", path = "./libs/neonart/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
@@ -292,7 +295,7 @@ walproposer = { version = "0.1", path = "./libs/walproposer/" }
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
cbindgen = "0.29.0"
cbindgen = "0.28.0"
criterion = "0.5.1"
rcgen = "0.13"
rstest = "0.18"

View File

@@ -30,18 +30,7 @@ ARG BASE_IMAGE_SHA=debian:${DEBIAN_FLAVOR}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:bookworm-slim/debian@$BOOKWORM_SLIM_SHA}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:bullseye-slim/debian@$BULLSEYE_SLIM_SHA}
# Naive way:
#
# 1. COPY . .
# 1. make neon-pg-ext
# 2. cargo build <storage binaries>
#
# But to enable docker to cache intermediate layers, we perform a few preparatory steps:
#
# - Build all postgres versions, depending on just the contents of vendor/
# - Use cargo chef to build all rust dependencies
# 1. Build all postgres versions
# Build Postgres
FROM $REPOSITORY/$IMAGE:$TAG AS pg-build
WORKDIR /home/nonroot
@@ -49,62 +38,45 @@ COPY --chown=nonroot vendor/postgres-v14 vendor/postgres-v14
COPY --chown=nonroot vendor/postgres-v15 vendor/postgres-v15
COPY --chown=nonroot vendor/postgres-v16 vendor/postgres-v16
COPY --chown=nonroot vendor/postgres-v17 vendor/postgres-v17
COPY --chown=nonroot pgxn pgxn
COPY --chown=nonroot Makefile Makefile
COPY --chown=nonroot postgres.mk postgres.mk
COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE=release
RUN set -e \
&& mold -run make -j $(nproc) -s postgres
&& mold -run make -j $(nproc) -s neon-pg-ext \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
# 2. Prepare cargo-chef recipe
# Prepare cargo-chef recipe
FROM $REPOSITORY/$IMAGE:$TAG AS plan
WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero" && \
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a; \
fi \
&& cargo chef prepare --recipe-path recipe.json
RUN cargo chef prepare --recipe-path recipe.json
# Main build image
# Build neon binaries
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG ADDITIONAL_RUSTFLAGS=""
ENV CARGO_FEATURES="default"
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
# `recipe.json`, so the layer can be reused as long as none of the dependencies change.
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"; \
fi \
ARG ADDITIONAL_RUSTFLAGS=""
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
# Perform the main build. We reuse the Postgres build artifacts from the intermediate 'pg-build'
# layer, and the cargo dependencies built in the previous step.
COPY --chown=nonroot --from=pg-build /home/nonroot/pg_install/ pg_install
COPY --chown=nonroot . .
COPY --chown=nonroot --from=plan /home/nonroot/proxy/Cargo.toml proxy/Cargo.toml
COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Cargo.lock
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_FEATURES="rest_broker"; \
fi \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo auditable build \
--features $CARGO_FEATURES \
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \
@@ -115,10 +87,10 @@ RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
--bin endpoint_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release \
&& mold -run make -j $(nproc) -s neon-pg-ext
--locked --release
# Assemble the final image
# Build final image
#
FROM $BASE_IMAGE_SHA
WORKDIR /data
@@ -158,15 +130,12 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
COPY --from=build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=build /home/nonroot/pg_install/v16 /usr/local/v16/
COPY --from=build /home/nonroot/pg_install/v17 /usr/local/v17/
# Deprecated: Old deployment scripts use this tarball which contains all the Postgres binaries.
# That's obsolete, since all the same files are also present under /usr/local/v*. But to keep the
# old scripts working for now, create the tarball.
RUN tar -C /usr/local -cvzf /data/postgres_install.tar.gz v14 v15 v16 v17
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=pg-build /home/nonroot/pg_install/v16 /usr/local/v16/
COPY --from=pg-build /home/nonroot/pg_install/v17 /usr/local/v17/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config.
# Now, when `docker run ... pageserver` is run, it can start without errors, yet will have some default dummy values.

View File

@@ -2,7 +2,7 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# Supported PostgreSQL versions
POSTGRES_VERSIONS = v17 v16 v15 v14
@@ -14,7 +14,7 @@ POSTGRES_VERSIONS = v17 v16 v15 v14
# it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := $(ROOT_PROJECT_DIR)/build
BUILD_DIR := build
ICU_PREFIX_DIR := /usr/local/icu
@@ -109,7 +109,7 @@ all: neon postgres-install neon-pg-ext
### Neon Rust bits
#
# The 'postgres_ffi' crate depends on the Postgres headers.
# The 'postgres_ffi' depends on the Postgres headers.
.PHONY: neon
neon: postgres-headers-install walproposer-lib cargo-target-dir
+@echo "Compiling Neon"
@@ -122,7 +122,7 @@ cargo-target-dir:
test -e target/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > target/CACHEDIR.TAG
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-install-% cargo-target-dir
neon-pg-ext-%: postgres-install-%
+@echo "Compiling neon-specific Postgres extensions for $*"
mkdir -p $(BUILD_DIR)/pgxn-$*
$(MAKE) PG_CONFIG="$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config" COPT='$(COPT)' \
@@ -212,7 +212,7 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \
INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \
-C $(BUILD_DIR)/pgxn-v17/neon \
-C $(BUILD_DIR)/neon-v17 \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent
@@ -220,19 +220,6 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
build-tools/node_modules: build-tools/package.json
cd build-tools && $(if $(CI),npm ci,npm install)
touch build-tools/node_modules
.PHONY: lint-openapi-spec
lint-openapi-spec: build-tools/node_modules
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
npx --prefix=build-tools/ redocly\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already

View File

@@ -35,17 +35,17 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused=on\ntimeout=15\ntries=5\nretry-on-host-error=on\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt-get update && \
apt-get install -y --no-install-recommends \
apt update && \
apt install -y --no-install-recommends \
ca-certificates wget gpg && \
wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/postgresql-keyring.gpg] http://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
apt-get update && \
apt-get install -y --no-install-recommends \
apt install -y --no-install-recommends \
build-essential \
autotools-dev \
libedit-dev \
@@ -89,7 +89,8 @@ RUN useradd -ms /bin/bash nonroot -b /home
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
RUN mkdir -p /pgcopydb/{bin,lib} && \
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
chmod -R 755 /pgcopydb && \
chown -R nonroot:nonroot /pgcopydb
@@ -105,8 +106,8 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -e \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
&& apt update \
&& apt install -y \
autoconf \
automake \
bison \
@@ -182,22 +183,16 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
ENV LLVM_VERSION=20
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& apt update \
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt-get install -y --no-install-recommends nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends docker-ce docker-ce-cli \
&& apt update \
&& apt install -y docker-ce docker-ce-cli \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Configure sudo & docker
@@ -214,11 +209,12 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.37.1
RUN set -e \
&& git clone -b "${MOLD_VERSION}" --depth 1 https://github.com/rui314/mold.git \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
&& cd mold/build \
&& cd mold/build \
&& git checkout ${MOLD_VERSION} \
&& cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++ .. \
&& cmake --build . -j "$(nproc)" \
&& cmake --build . -j $(nproc) \
&& cmake --install . \
&& cd .. \
&& rm -rf mold
@@ -252,7 +248,7 @@ ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
# Download and build static ICU
RUN wget -O "/tmp/libicu-${ICU_VERSION}.tgz" https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
mkdir /tmp/icu && \
pushd /tmp/icu && \
@@ -263,7 +259,8 @@ RUN wget -O "/tmp/libicu-${ICU_VERSION}.tgz" https://github.com/unicode-org/icu/
make install && \
popd && \
rm -rf icu && \
rm -f /tmp/libicu-${ICU_VERSION}.tgz
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
popd
# Switch to nonroot user
USER nonroot:nonroot
@@ -276,19 +273,19 @@ ENV PYTHON_VERSION=3.11.12 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
&& cd "$HOME" \
&& cd $HOME \
&& curl -sSO https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer \
&& chmod +x pyenv-installer \
&& ./pyenv-installer \
&& export PYENV_ROOT=/home/nonroot/.pyenv \
&& export PATH="$PYENV_ROOT/bin:$PATH" \
&& export PATH="$PYENV_ROOT/shims:$PATH" \
&& pyenv install "${PYTHON_VERSION}" \
&& pyenv global "${PYTHON_VERSION}" \
&& pyenv install ${PYTHON_VERSION} \
&& pyenv global ${PYTHON_VERSION} \
&& python --version \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --upgrade pip \
&& pip --version \
&& pip install --no-cache-dir pipenv wheel poetry
&& pip install pipenv wheel poetry
# Switch to nonroot user (again)
USER nonroot:nonroot
@@ -299,7 +296,6 @@ WORKDIR /home/nonroot
ENV RUSTC_VERSION=1.88.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG CARGO_AUDITABLE_VERSION=0.7.0
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
@@ -315,16 +311,14 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" && \
cargo auditable install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" --force && \
cargo auditable install rustfilt --version "${RUSTFILT_VERSION}" && \
cargo auditable install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \
cargo auditable install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \
cargo auditable install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \
cargo auditable install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \
cargo auditable install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \
cargo auditable install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \
--features postgres-bundled --no-default-features && \
cargo install rustfilt --version ${RUSTFILT_VERSION} --locked && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} --locked && \
cargo install cargo-deny --version ${CARGO_DENY_VERSION} --locked && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} --locked && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} --locked && \
cargo install cargo-chef --version ${CARGO_CHEF_VERSION} --locked && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} --locked \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +0,0 @@
{
"name": "build-tools",
"private": true,
"devDependencies": {
"@redocly/cli": "1.34.5",
"@sourcemeta/jsonschema": "10.0.0"
}
}

View File

@@ -1,12 +1,9 @@
disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# tokio-epoll-uring:
# - allow-invalid because the method doesn't exist on macOS
{ path = "tokio_epoll_uring::thread_local_system", replacement = "tokio_epoll_uring_ext module inside pageserver crate", allow-invalid = true }
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]
disallowed-macros = [

View File

@@ -50,9 +50,9 @@ jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: ../build-tools/node_modules
npx --prefix=../build-tools/ jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
../build-tools/node_modules: ../build-tools/package.json
cd ../build-tools && $(if $(CI),npm ci,npm install)
touch ../build-tools/node_modules
node_modules: package.json
npm install
touch node_modules

View File

@@ -9,7 +9,7 @@
#
# build-tools: This contains Rust compiler toolchain and other tools needed at compile
# time. This is also used for the storage builds. This image is defined in
# build-tools/Dockerfile.
# build-tools.Dockerfile.
#
# build-deps: Contains C compiler, other build tools, and compile-time dependencies
# needed to compile PostgreSQL and most extensions. (Some extensions need
@@ -115,7 +115,7 @@ ARG EXTENSIONS=all
FROM $BASE_IMAGE_SHA AS build-deps
ARG DEBIAN_VERSION
# Keep in sync with build-tools/Dockerfile
# Keep in sync with build-tools.Dockerfile
ENV PROTOC_VERSION=25.1
# Use strict mode for bash to catch errors early
@@ -133,7 +133,7 @@ RUN case $DEBIAN_VERSION in \
# Install newer version (3.25) from backports.
# libstdc++-10-dev is required for plv8
bullseye) \
echo "deb http://archive.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports libstdc++-10-dev"; \
;; \
# Version-specific installs for Bookworm (PG17):
@@ -170,29 +170,7 @@ RUN case $DEBIAN_VERSION in \
FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
COPY compute/patches/postgres_fdw.patch .
COPY compute/patches/pg_stat_statements_pg14-16.patch .
COPY compute/patches/pg_stat_statements_pg17.patch .
RUN cd postgres && \
# Apply patches to some contrib extensions
# For example, we need to grant EXECUTE on pg_stat_statements_reset() to {privileged_role_name}.
# In vanilla Postgres this function is limited to Postgres role superuser.
# In Neon we have {privileged_role_name} role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the Postgres repository but it would be hard to maintain,
# whenever we need to pick up a new Postgres version and we want to limit the changes in our Postgres fork,
# so we do it here.
case "${PG_VERSION}" in \
"v14" | "v15" | "v16") \
patch -p1 < /pg_stat_statements_pg14-16.patch; \
;; \
"v17") \
patch -p1 < /pg_stat_statements_pg17.patch; \
;; \
*) \
# To do not forget to migrate patches to the next major version
echo "No contrib patches for this PostgreSQL version" && exit 1;; \
esac && \
patch -p1 < /postgres_fdw.patch && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
@@ -206,6 +184,8 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgres_fdw.control && \
file=/usr/local/pgsql/share/extension/postgres_fdw--1.0.sql && [ -e $file ] && \
echo 'GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO neon_superuser;' >> $file && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
@@ -215,7 +195,34 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control && \
# We need to grant EXECUTE on pg_stat_statements_reset() to neon_superuser.
# In vanilla postgres this function is limited to Postgres role superuser.
# In neon we have neon_superuser role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
# Note that there are no downgrade scripts for pg_stat_statements, so we \
# don't have to modify any downgrade paths or (much) older versions: we only \
# have to make sure every creation of the pg_stat_statements_reset function \
# also adds execute permissions to the neon_superuser.
case $filename in \
pg_stat_statements--1.4.sql) \
# pg_stat_statements_reset is first created with 1.4
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.6--1.7.sql) \
# Then with the 1.6-1.7 migration it is re-created with a new signature, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.10--1.11.sql) \
# Then with the 1.10-1.11 migration it is re-created with a new signature again, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO neon_superuser;' >> $file; \
;; \
esac; \
done;
# Set PATH for all the subsequent build steps
ENV PATH="/usr/local/pgsql/bin:$PATH"
@@ -1517,7 +1524,7 @@ WORKDIR /ext-src
COPY compute/patches/pg_duckdb_v031.patch .
COPY compute/patches/duckdb_v120.patch .
# pg_duckdb build requires source dir to be a git repo to get submodules
# allow {privileged_role_name} to execute some functions that in pg_duckdb are available to superuser only:
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
# - extension management function duckdb.install_extension()
# - access to duckdb.extensions table and its sequence
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
@@ -1565,7 +1572,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
FROM build-deps AS pgaudit-src
ARG PG_VERSION
WORKDIR /ext-src
COPY "compute/patches/pgaudit-parallel_workers-${PG_VERSION}.patch" .
RUN case "${PG_VERSION}" in \
"v14") \
export PGAUDIT_VERSION=1.6.3 \
@@ -1588,8 +1594,7 @@ RUN case "${PG_VERSION}" in \
esac && \
wget https://github.com/pgaudit/pgaudit/archive/refs/tags/${PGAUDIT_VERSION}.tar.gz -O pgaudit.tar.gz && \
echo "${PGAUDIT_CHECKSUM} pgaudit.tar.gz" | sha256sum --check && \
mkdir pgaudit-src && cd pgaudit-src && tar xzf ../pgaudit.tar.gz --strip-components=1 -C . && \
patch -p1 < "/ext-src/pgaudit-parallel_workers-${PG_VERSION}.patch"
mkdir pgaudit-src && cd pgaudit-src && tar xzf ../pgaudit.tar.gz --strip-components=1 -C .
FROM pg-build AS pgaudit-build
COPY --from=pgaudit-src /ext-src/ /ext-src/
@@ -1629,14 +1634,11 @@ RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
# compile neon extensions
#
#########################################################################################
FROM pg-build-with-cargo AS neon-ext-build
FROM pg-build AS neon-ext-build
ARG PG_VERSION
USER root
COPY . .
RUN make -j $(getconf _NPROCESSORS_ONLN) -C pgxn -s install-compute \
BUILD_TYPE=release CARGO_BUILD_FLAGS="--locked --release" NEON_CARGO_ARTIFACT_TARGET_DIR="$(pwd)/target/release"
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) -C pgxn -s install-compute
#########################################################################################
#
@@ -1783,7 +1785,7 @@ RUN set -e \
#########################################################################################
FROM build-deps AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools/Dockerfile and
# Keep sql_exporter version same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN if [ "$TARGETARCH" = "amd64" ]; then\
@@ -1908,10 +1910,10 @@ RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /e
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN echo /usr/local/pgsql/lib > /etc/ld.so.conf.d/00-neon.conf && /sbin/ldconfig
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq parallel \
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute1
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres
@@ -1981,7 +1983,7 @@ RUN apt update && \
locales \
lsof \
procps \
rsyslog-gnutls \
rsyslog \
screen \
tcpdump \
$VERSION_INSTALLS && \

View File

@@ -8,8 +8,6 @@
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
import 'sql_exporter/compute_max_connections.libsonnet',
import 'sql_exporter/compute_pg_oldest_frozen_xid_age.libsonnet',
import 'sql_exporter/compute_pg_oldest_mxid_age.libsonnet',
import 'sql_exporter/compute_receive_lsn.libsonnet',
import 'sql_exporter/compute_subscriptions_count.libsonnet',
import 'sql_exporter/connection_counts.libsonnet',

View File

@@ -1 +1 @@
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;

View File

@@ -1 +1 @@
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_req FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_timed FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
END AS lsn;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -1,9 +1,9 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

View File

@@ -1 +1 @@
SELECT pg_catalog.current_setting('max_connections') AS max_connections;
SELECT current_setting('max_connections') as max_connections;

View File

@@ -1,13 +0,0 @@
{
metric_name: 'compute_pg_oldest_frozen_xid_age',
type: 'gauge',
help: 'Age of oldest XIDs that have not been frozen by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'frozen_xid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_frozen_xid_age.sql',
}

View File

@@ -1,4 +0,0 @@
SELECT datname database_name,
pg_catalog.age(datfrozenxid) frozen_xid_age
FROM pg_catalog.pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -1,13 +0,0 @@
{
metric_name: 'compute_pg_oldest_mxid_age',
type: 'gauge',
help: 'Age of oldest MXIDs that have not been replaced by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'min_mxid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_mxid_age.sql',
}

View File

@@ -1,4 +0,0 @@
SELECT datname database_name,
pg_catalog.mxid_age(datminmxid) min_mxid_age
FROM pg_catalog.pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
ELSE 0
END AS lsn;

View File

@@ -1 +1 @@
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;

View File

@@ -1 +1 @@
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;

View File

@@ -1,5 +1,5 @@
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
FROM pg_catalog.pg_database
SELECT sum(pg_database_size(datname)) AS total
FROM pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2;

View File

@@ -3,6 +3,6 @@
-- minutes.
SELECT
x::pg_catalog.text AS duration_seconds,
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) AS size
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);

View File

@@ -3,6 +3,6 @@
SELECT
x AS duration,
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
VALUES ('5m'), ('15m'), ('1h')
) AS t (x);

View File

@@ -1 +1 @@
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;

View File

@@ -1,3 +1,3 @@
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
FROM pg_catalog.pg_replication_slots
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';

View File

@@ -1 +1 @@
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';

View File

@@ -1,13 +1,13 @@
-- We export stats for 10 non-system databases. Without this limit it is too
-- easy to abuse the system by creating lots of databases.
SELECT pg_catalog.pg_database_size(datname) AS db_size,
SELECT pg_database_size(datname) AS db_size,
deadlocks,
tup_inserted AS inserted,
tup_updated AS updated,
tup_deleted AS deleted,
datname
FROM pg_catalog.pg_stat_database
FROM pg_stat_database
WHERE datname IN (
SELECT datname FROM pg_database
-- Ignore invalid databases, as we will likely have problems with

View File

@@ -3,4 +3,4 @@
-- replay LSN may have advanced past the receive LSN we are using for the
-- calculation.
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;

View File

@@ -1,5 +1,5 @@
SELECT
CASE
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
END AS replication_delay_seconds;

View File

@@ -1,10 +1,10 @@
SELECT
slot_name,
pg_catalog.pg_wal_lsn_diff(
pg_wal_lsn_diff(
CASE
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
ELSE pg_catalog.pg_current_wal_lsn()
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
END,
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
FROM pg_catalog.pg_replication_slots
restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
WHERE active = false;

View File

@@ -4,4 +4,4 @@ SELECT
WHEN wal_status = 'lost' THEN 1
ELSE 0
END AS wal_is_lost
FROM pg_catalog.pg_replication_slots;
FROM pg_replication_slots;

7
compute/package.json Normal file
View File

@@ -0,0 +1,7 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -1,27 +1,16 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..5eab1d6 100644
index 0cdc769..f6cc950 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,19 @@ $$
@@ -1141,3 +1141,8 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
+
+-- NEON Patches
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON SCHEMA anon to %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON ALL TABLES IN SCHEMA anon TO %I', privileged_role_name);
+
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ EXECUTE format('GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO %I', privileged_role_name);
+ END IF;
+END $$;
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644
--- a/sql/init.sql

View File

@@ -21,21 +21,13 @@ index 3235cc8..6b892bc 100644
include Makefile.global
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..3b54396 100644
index d777d76..af60106 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
@@ -1056,3 +1056,14 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON TABLE duckdb.extensions TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO %I', privileged_role_name);
+END $$;
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;

View File

@@ -1,11 +1,5 @@
commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb
Author: Alexey Masterov <alexey.masterov@databricks.com>
Date: Mon Jul 28 18:11:02 2025 +0200
Patch for pg_repack
diff --git a/regress/Makefile b/regress/Makefile
index bf6edcb..110e734 100644
index bf6edcb..89b4c7f 100644
--- a/regress/Makefile
+++ b/regress/Makefile
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
@@ -13,36 +7,18 @@ index bf6edcb..110e734 100644
#
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
+REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out
new file mode 100644
index 0000000..e7f2363
--- /dev/null
+++ b/regress/expected/autovacuum.out
@@ -0,0 +1,7 @@
+ALTER SYSTEM SET autovacuum='on';
+SELECT pg_reload_conf();
+ pg_reload_conf
+----------------
+ t
+(1 row)
+
diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out
new file mode 100644
index 0000000..fc7978e
--- /dev/null
+++ b/regress/expected/noautovacuum.out
@@ -0,0 +1,7 @@
+ALTER SYSTEM SET autovacuum='off';
+SELECT pg_reload_conf();
+ pg_reload_conf
+----------------
+ t
+(1 row)
+
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -74,22 +50,14 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql
new file mode 100644
index 0000000..a8eda63
--- /dev/null
+++ b/regress/sql/autovacuum.sql
@@ -0,0 +1,2 @@
+ALTER SYSTEM SET autovacuum='on';
+SELECT pg_reload_conf();
diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql
new file mode 100644
index 0000000..13d4836
--- /dev/null
+++ b/regress/sql/noautovacuum.sql
@@ -0,0 +1,2 @@
+ALTER SYSTEM SET autovacuum='off';
+SELECT pg_reload_conf();
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -1,34 +0,0 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -1,52 +0,0 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
index 0bb2c397711..32764db1d8b 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
@@ -80,3 +80,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO %I', privileged_role_name);
+END $$;
\ No newline at end of file
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -1,143 +0,0 @@
commit 7220bb3a3f23fa27207d77562dcc286f9a123313
Author: Tristan Partin <tristan.partin@databricks.com>
Date: 2025-06-23 02:09:31 +0000
Disable logging in parallel workers
When a query uses parallel workers, pgaudit will log the same query for
every parallel worker. This is undesireable since it can result in log
amplification for queries that use parallel workers.
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
diff --git a/expected/pgaudit.out b/expected/pgaudit.out
index baa8011..a601375 100644
--- a/expected/pgaudit.out
+++ b/expected/pgaudit.out
@@ -2563,6 +2563,37 @@ COMMIT;
NOTICE: AUDIT: SESSION,12,4,MISC,COMMIT,,,COMMIT;,<not logged>
DROP TABLE part_test;
NOTICE: AUDIT: SESSION,13,1,DDL,DROP TABLE,,,DROP TABLE part_test;,<not logged>
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+SELECT count(*) FROM parallel_test;
+NOTICE: AUDIT: SESSION,14,1,READ,SELECT,,,SELECT count(*) FROM parallel_test;,<not logged>
+ count
+-------
+ 1000
+(1 row)
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';
diff --git a/pgaudit.c b/pgaudit.c
index 5e6fd38..ac9ded2 100644
--- a/pgaudit.c
+++ b/pgaudit.c
@@ -11,6 +11,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/relation.h"
@@ -1303,7 +1304,7 @@ pgaudit_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
{
AuditEventStackItem *stackItem = NULL;
- if (!internalStatement)
+ if (!internalStatement && !IsParallelWorker())
{
/* Push the audit even onto the stack */
stackItem = stack_push();
@@ -1384,7 +1385,7 @@ pgaudit_ExecutorCheckPerms_hook(List *rangeTabls, bool abort)
/* Log DML if the audit role is valid or session logging is enabled */
if ((auditOid != InvalidOid || auditLogBitmap != 0) &&
- !IsAbortedTransactionBlockState())
+ !IsAbortedTransactionBlockState() && !IsParallelWorker())
{
/* If auditLogRows is on, wait for rows processed to be set */
if (auditLogRows && auditEventStack != NULL)
@@ -1438,7 +1439,7 @@ pgaudit_ExecutorRun_hook(QueryDesc *queryDesc, ScanDirection direction, uint64 c
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
@@ -1458,7 +1459,7 @@ pgaudit_ExecutorEnd_hook(QueryDesc *queryDesc)
AuditEventStackItem *stackItem = NULL;
AuditEventStackItem *auditEventStackFull = NULL;
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
diff --git a/sql/pgaudit.sql b/sql/pgaudit.sql
index cc1374a..1870a60 100644
--- a/sql/pgaudit.sql
+++ b/sql/pgaudit.sql
@@ -1612,6 +1612,36 @@ COMMIT;
DROP TABLE part_test;
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+
+SELECT count(*) FROM parallel_test;
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
+
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';

View File

@@ -1,143 +0,0 @@
commit 29dc2847f6255541992f18faf8a815dfab79631a
Author: Tristan Partin <tristan.partin@databricks.com>
Date: 2025-06-23 02:09:31 +0000
Disable logging in parallel workers
When a query uses parallel workers, pgaudit will log the same query for
every parallel worker. This is undesireable since it can result in log
amplification for queries that use parallel workers.
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
diff --git a/expected/pgaudit.out b/expected/pgaudit.out
index b22560b..73f0327 100644
--- a/expected/pgaudit.out
+++ b/expected/pgaudit.out
@@ -2563,6 +2563,37 @@ COMMIT;
NOTICE: AUDIT: SESSION,12,4,MISC,COMMIT,,,COMMIT;,<not logged>
DROP TABLE part_test;
NOTICE: AUDIT: SESSION,13,1,DDL,DROP TABLE,,,DROP TABLE part_test;,<not logged>
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+SELECT count(*) FROM parallel_test;
+NOTICE: AUDIT: SESSION,14,1,READ,SELECT,,,SELECT count(*) FROM parallel_test;,<not logged>
+ count
+-------
+ 1000
+(1 row)
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';
diff --git a/pgaudit.c b/pgaudit.c
index 5e6fd38..ac9ded2 100644
--- a/pgaudit.c
+++ b/pgaudit.c
@@ -11,6 +11,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/relation.h"
@@ -1303,7 +1304,7 @@ pgaudit_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
{
AuditEventStackItem *stackItem = NULL;
- if (!internalStatement)
+ if (!internalStatement && !IsParallelWorker())
{
/* Push the audit even onto the stack */
stackItem = stack_push();
@@ -1384,7 +1385,7 @@ pgaudit_ExecutorCheckPerms_hook(List *rangeTabls, bool abort)
/* Log DML if the audit role is valid or session logging is enabled */
if ((auditOid != InvalidOid || auditLogBitmap != 0) &&
- !IsAbortedTransactionBlockState())
+ !IsAbortedTransactionBlockState() && !IsParallelWorker())
{
/* If auditLogRows is on, wait for rows processed to be set */
if (auditLogRows && auditEventStack != NULL)
@@ -1438,7 +1439,7 @@ pgaudit_ExecutorRun_hook(QueryDesc *queryDesc, ScanDirection direction, uint64 c
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
@@ -1458,7 +1459,7 @@ pgaudit_ExecutorEnd_hook(QueryDesc *queryDesc)
AuditEventStackItem *stackItem = NULL;
AuditEventStackItem *auditEventStackFull = NULL;
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
diff --git a/sql/pgaudit.sql b/sql/pgaudit.sql
index 8052426..7f0667b 100644
--- a/sql/pgaudit.sql
+++ b/sql/pgaudit.sql
@@ -1612,6 +1612,36 @@ COMMIT;
DROP TABLE part_test;
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+
+SELECT count(*) FROM parallel_test;
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
+
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';

View File

@@ -1,143 +0,0 @@
commit cc708dde7ef2af2a8120d757102d2e34c0463a0f
Author: Tristan Partin <tristan.partin@databricks.com>
Date: 2025-06-23 02:09:31 +0000
Disable logging in parallel workers
When a query uses parallel workers, pgaudit will log the same query for
every parallel worker. This is undesireable since it can result in log
amplification for queries that use parallel workers.
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
diff --git a/expected/pgaudit.out b/expected/pgaudit.out
index 8772054..9b66ac6 100644
--- a/expected/pgaudit.out
+++ b/expected/pgaudit.out
@@ -2556,6 +2556,37 @@ DROP SERVER fdw_server;
NOTICE: AUDIT: SESSION,11,1,DDL,DROP SERVER,,,DROP SERVER fdw_server;,<not logged>
DROP EXTENSION postgres_fdw;
NOTICE: AUDIT: SESSION,12,1,DDL,DROP EXTENSION,,,DROP EXTENSION postgres_fdw;,<not logged>
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+SELECT count(*) FROM parallel_test;
+NOTICE: AUDIT: SESSION,13,1,READ,SELECT,,,SELECT count(*) FROM parallel_test;,<not logged>
+ count
+-------
+ 1000
+(1 row)
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';
diff --git a/pgaudit.c b/pgaudit.c
index 004d1f9..f061164 100644
--- a/pgaudit.c
+++ b/pgaudit.c
@@ -11,6 +11,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/relation.h"
@@ -1339,7 +1340,7 @@ pgaudit_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
{
AuditEventStackItem *stackItem = NULL;
- if (!internalStatement)
+ if (!internalStatement && !IsParallelWorker())
{
/* Push the audit even onto the stack */
stackItem = stack_push();
@@ -1420,7 +1421,7 @@ pgaudit_ExecutorCheckPerms_hook(List *rangeTabls, List *permInfos, bool abort)
/* Log DML if the audit role is valid or session logging is enabled */
if ((auditOid != InvalidOid || auditLogBitmap != 0) &&
- !IsAbortedTransactionBlockState())
+ !IsAbortedTransactionBlockState() && !IsParallelWorker())
{
/* If auditLogRows is on, wait for rows processed to be set */
if (auditLogRows && auditEventStack != NULL)
@@ -1475,7 +1476,7 @@ pgaudit_ExecutorRun_hook(QueryDesc *queryDesc, ScanDirection direction, uint64 c
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
@@ -1495,7 +1496,7 @@ pgaudit_ExecutorEnd_hook(QueryDesc *queryDesc)
AuditEventStackItem *stackItem = NULL;
AuditEventStackItem *auditEventStackFull = NULL;
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
diff --git a/sql/pgaudit.sql b/sql/pgaudit.sql
index 6aae88b..de6d7fd 100644
--- a/sql/pgaudit.sql
+++ b/sql/pgaudit.sql
@@ -1631,6 +1631,36 @@ DROP USER MAPPING FOR regress_user1 SERVER fdw_server;
DROP SERVER fdw_server;
DROP EXTENSION postgres_fdw;
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+
+SELECT count(*) FROM parallel_test;
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
+
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';

View File

@@ -1,143 +0,0 @@
commit 8d02e4c6c5e1e8676251b0717a46054267091cb4
Author: Tristan Partin <tristan.partin@databricks.com>
Date: 2025-06-23 02:09:31 +0000
Disable logging in parallel workers
When a query uses parallel workers, pgaudit will log the same query for
every parallel worker. This is undesireable since it can result in log
amplification for queries that use parallel workers.
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
diff --git a/expected/pgaudit.out b/expected/pgaudit.out
index d696287..4b1059a 100644
--- a/expected/pgaudit.out
+++ b/expected/pgaudit.out
@@ -2568,6 +2568,37 @@ DROP SERVER fdw_server;
NOTICE: AUDIT: SESSION,11,1,DDL,DROP SERVER,,,DROP SERVER fdw_server,<not logged>
DROP EXTENSION postgres_fdw;
NOTICE: AUDIT: SESSION,12,1,DDL,DROP EXTENSION,,,DROP EXTENSION postgres_fdw,<not logged>
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+SELECT count(*) FROM parallel_test;
+NOTICE: AUDIT: SESSION,13,1,READ,SELECT,,,SELECT count(*) FROM parallel_test,<not logged>
+ count
+-------
+ 1000
+(1 row)
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';
diff --git a/pgaudit.c b/pgaudit.c
index 1764af1..0e48875 100644
--- a/pgaudit.c
+++ b/pgaudit.c
@@ -11,6 +11,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/relation.h"
@@ -1406,7 +1407,7 @@ pgaudit_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
{
AuditEventStackItem *stackItem = NULL;
- if (!internalStatement)
+ if (!internalStatement && !IsParallelWorker())
{
/* Push the audit event onto the stack */
stackItem = stack_push();
@@ -1489,7 +1490,7 @@ pgaudit_ExecutorCheckPerms_hook(List *rangeTabls, List *permInfos, bool abort)
/* Log DML if the audit role is valid or session logging is enabled */
if ((auditOid != InvalidOid || auditLogBitmap != 0) &&
- !IsAbortedTransactionBlockState())
+ !IsAbortedTransactionBlockState() && !IsParallelWorker())
{
/* If auditLogRows is on, wait for rows processed to be set */
if (auditLogRows && auditEventStack != NULL)
@@ -1544,7 +1545,7 @@ pgaudit_ExecutorRun_hook(QueryDesc *queryDesc, ScanDirection direction, uint64 c
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
@@ -1564,7 +1565,7 @@ pgaudit_ExecutorEnd_hook(QueryDesc *queryDesc)
AuditEventStackItem *stackItem = NULL;
AuditEventStackItem *auditEventStackFull = NULL;
- if (auditLogRows && !internalStatement)
+ if (auditLogRows && !internalStatement && !IsParallelWorker())
{
/* Find an item from the stack by the query memory context */
stackItem = stack_find_context(queryDesc->estate->es_query_cxt);
diff --git a/sql/pgaudit.sql b/sql/pgaudit.sql
index e161f01..c873098 100644
--- a/sql/pgaudit.sql
+++ b/sql/pgaudit.sql
@@ -1637,6 +1637,36 @@ DROP USER MAPPING FOR regress_user1 SERVER fdw_server;
DROP SERVER fdw_server;
DROP EXTENSION postgres_fdw;
+--
+-- Test logging in parallel workers
+SET pgaudit.log = 'read';
+SET pgaudit.log_client = on;
+SET pgaudit.log_level = 'notice';
+
+-- Force parallel execution for testing
+SET max_parallel_workers_per_gather = 2;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET min_parallel_table_scan_size = 0;
+SET min_parallel_index_scan_size = 0;
+
+-- Create table with enough data to trigger parallel execution
+CREATE TABLE parallel_test (id int, data text);
+INSERT INTO parallel_test SELECT generate_series(1, 1000), 'test data';
+
+SELECT count(*) FROM parallel_test;
+
+-- Cleanup parallel test
+DROP TABLE parallel_test;
+RESET max_parallel_workers_per_gather;
+RESET parallel_tuple_cost;
+RESET parallel_setup_cost;
+RESET min_parallel_table_scan_size;
+RESET min_parallel_index_scan_size;
+RESET pgaudit.log;
+RESET pgaudit.log_client;
+RESET pgaudit.log_level;
+
-- Cleanup
-- Set client_min_messages up to warning to avoid noise
SET client_min_messages = 'warning';

View File

@@ -1,17 +0,0 @@
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1bf45..ee077f2eea6 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,12 @@ LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER postgres_fdw
HANDLER postgres_fdw_handler
VALIDATOR postgres_fdw_validator;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO %I', privileged_role_name);
+END $$;

View File

@@ -26,13 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
# Turn off database collector (`--no-collector.database`), we don't use `pg_database_size_bytes` metric anyway, see
# https://github.com/neondatabase/flux-fleet/blob/5e19b3fd897667b70d9a7ad4aa06df0ca22b49ff/apps/base/compute-metrics/scrape-compute-pg-exporter-neon.yaml#L29
# but it's enabled by default and it doesn't filter out invalid databases, see
# https://github.com/prometheus-community/postgres_exporter/blob/06a553c8166512c9d9c5ccf257b0f9bba8751dbc/collector/pg_database.go#L67
# so if it hits one, it starts spamming logs
# ERROR: [NEON_SMGR] [reqid d9700000018] could not read db size of db 705302 from page server at lsn 5/A2457EB0
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --no-collector.database --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn

View File

@@ -26,13 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
# Turn off database collector (`--no-collector.database`), we don't use `pg_database_size_bytes` metric anyway, see
# https://github.com/neondatabase/flux-fleet/blob/5e19b3fd897667b70d9a7ad4aa06df0ca22b49ff/apps/base/compute-metrics/scrape-compute-pg-exporter-neon.yaml#L29
# but it's enabled by default and it doesn't filter out invalid databases, see
# https://github.com/prometheus-community/postgres_exporter/blob/06a553c8166512c9d9c5ccf257b0f9bba8751dbc/collector/pg_database.go#L67
# so if it hits one, it starts spamming logs
# ERROR: [NEON_SMGR] [reqid d9700000018] could not read db size of db 705302 from page server at lsn 5/A2457EB0
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --no-collector.database --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn

View File

@@ -27,10 +27,6 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hostname-validator = "1.1"
hyper.workspace = true
hyper-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
@@ -47,7 +43,6 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
scopeguard.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
@@ -62,7 +57,6 @@ tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-appender.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
@@ -71,7 +65,7 @@ url.workspace = true
uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -46,20 +46,11 @@ stateDiagram-v2
Configuration --> Failed : Failed to configure the compute
Configuration --> Running : Compute has been configured
Empty --> Init : Compute spec is immediately available
Empty --> TerminationPendingFast : Requested termination
Empty --> TerminationPendingImmediate : Requested termination
Empty --> TerminationPending : Requested termination
Init --> Failed : Failed to start Postgres
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
Running --> ConfigurationPending : Received a /configure request with spec
Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure
RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration
RefreshConfiguration --> Running : Compute has been re-configured
RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
Running --> TerminationPending : Requested termination
TerminationPending --> Terminated : Terminated compute
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```

View File

@@ -36,8 +36,6 @@
use std::ffi::OsString;
use std::fs::File;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
@@ -49,10 +47,9 @@ use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
use compute_tools::pg_isready::get_pg_isready_bin;
use compute_tools::spec::*;
use compute_tools::{hadron_metrics, installed_extensions, logger::*};
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
@@ -82,29 +79,12 @@ struct Cli {
#[arg(long, default_value_t = 3081)]
pub internal_http_port: u16,
/// Backwards-compatible --http-port for Hadron deployments. Functionally the
/// same as --external-http-port.
#[arg(
long,
conflicts_with = "external_http_port",
conflicts_with = "internal_http_port"
)]
pub http_port: Option<u16>,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
#[arg(short = 'C', long, value_name = "DATABASE_URL")]
pub connstr: String,
#[arg(
long,
default_value = "neon_superuser",
value_name = "PRIVILEGED_ROLE_NAME",
value_parser = Self::parse_privileged_role_name
)]
pub privileged_role_name: String,
#[cfg(target_os = "linux")]
#[arg(long, default_value = "neon-postgres")]
pub cgroup: String,
@@ -148,12 +128,6 @@ struct Cli {
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
#[arg(long)]
pub pg_init_timeout: Option<u64>,
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
pub lakebase_mode: bool,
}
impl Cli {
@@ -173,41 +147,6 @@ impl Cli {
Ok(url)
}
/// For simplicity, we do not escape `privileged_role_name` anywhere in the code.
/// Since it's a system role, which we fully control, that's fine. Still, let's
/// validate it to avoid any surprises.
fn parse_privileged_role_name(value: &str) -> Result<String> {
use regex::Regex;
let pattern = Regex::new(r"^[a-z_]+$").unwrap();
if !pattern.is_match(value) {
bail!("--privileged-role-name can only contain lowercase letters and underscores")
}
Ok(value.to_string())
}
}
// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port`
// arg is used and acts the same as `--external-http-port`. The internal http port is defined
// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so
// we need to be careful with the ports to choose.
fn get_external_http_port(cli: &Cli) -> u16 {
if cli.lakebase_mode {
return cli.http_port.unwrap_or(cli.external_http_port);
}
cli.external_http_port
}
fn get_internal_http_port(cli: &Cli) -> u16 {
if cli.lakebase_mode {
return cli
.http_port
.map(|p| p + 1)
.unwrap_or(cli.internal_http_port);
}
cli.internal_http_port
}
fn main() -> Result<()> {
@@ -224,38 +163,24 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let mut log_dir = None;
if cli.lakebase_mode {
log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
}
let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
runtime.block_on(init(cli.dev))?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
if cli.lakebase_mode {
installed_extensions::initialize_metrics();
hadron_metrics::initialize_metrics();
}
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let config = get_config(&cli)?;
let external_http_port = get_external_http_port(&cli);
let internal_http_port = get_internal_http_port(&cli);
let compute_node = ComputeNode::new(
ComputeNodeParams {
compute_id: cli.compute_id,
connstr,
privileged_role_name: cli.privileged_role_name.clone(),
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port,
internal_http_port,
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
remote_ext_base_url: cli.remote_ext_base_url.clone(),
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
@@ -265,35 +190,20 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
instance_id: std::env::var("INSTANCE_ID").ok(),
lakebase_mode: cli.lakebase_mode,
build_tag: BUILD_TAG.to_string(),
control_plane_uri: cli.control_plane_uri,
config_path_test_only: cli.config,
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
},
config,
)?;
let exit_code = compute_node.run().context("running compute node")?;
let exit_code = compute_node.run()?;
scenario.teardown();
deinit_and_exit(tracing_provider, exit_code);
deinit_and_exit(exit_code);
}
fn init(
dev_mode: bool,
log_dir: Option<String>,
) -> Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
async fn init(dev_mode: bool) -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -304,7 +214,7 @@ fn init(
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok((provider, file_logs_guard))
Ok(())
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
@@ -329,27 +239,25 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
}
}
fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
if let Some(p) = tracing_provider {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
_ = p.shutdown();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
info!("shutting down");
@@ -415,49 +323,4 @@ mod test {
])
.expect_err("URL parameters are not allowed");
}
#[test]
fn verify_privileged_role_name() {
// Valid name
let cli = Cli::parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"my_superuser",
]);
assert_eq!(cli.privileged_role_name, "my_superuser");
// Invalid names
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"NeonSuperuser",
])
.expect_err("uppercase letters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"$'neon_superuser",
])
.expect_err("special characters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"",
])
.expect_err("empty name is not allowed");
}
}

View File

@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
});
let query = "
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = pg_catalog.now();";
SET updated_at = now();";
match client.simple_query(query).await {
Result::Ok(result) => {

View File

@@ -1,98 +0,0 @@
//! Client for making request to a running Postgres server's communicator control socket.
//!
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
use std::path::Path;
use anyhow::Context;
use hyper::client::conn::http1::SendRequest;
use hyper_util::rt::TokioIo;
/// Name of the socket within the Postgres data directory. This better match that in
/// `pgxn/neon/communicator/src/lib.rs`.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
/// Open a connection to the communicator's control socket, prepare to send requests to it
/// with hyper.
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
let socket_path_len = socket_path.display().to_string().len();
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
// Unix Domain socket. The limit is on the connect(2) function used to open the
// socket, not on the absolute path itself. Postgres changes the current directory to
// the data directory and uses a relative path to bind to the socket, and the relative
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
// open the socket, we need to use a full path, which can be arbitrarily long.
//
// There are a few ways we could work around this:
//
// 1. Change the current directory to the Postgres data directory and use a relative
// path in the connect(2) call. That's problematic because the current directory
// applies to the whole process. We could change the current directory early in
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
// it would be more robust if the data directory is moved around or unlinked for
// some reason, and you would be less likely to accidentally litter other parts of
// the filesystem with e.g. temporary files. However, that's a pretty invasive
// change.
//
// 2. On Linux, you could open() the data directory, and refer to the the socket
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
// Linux-only.
//
// 3. Create a symbolic link to the socket with a shorter path, and use that.
//
// We use the symbolic link approach here. Hopefully the paths we use in production
// are shorter, so that we can open the socket directly, so that this hack is needed
// only in development.
let connect_result = if socket_path_len < 100 {
// We can open the path directly with no hacks.
tokio::net::UnixStream::connect(socket_path).await
} else {
// The path to the socket is too long. Create a symlink to it with a shorter path.
let short_path = std::env::temp_dir().join(format!(
"compute_ctl.short-socket.{}.{}",
std::process::id(),
tokio::task::id()
));
std::os::unix::fs::symlink(&socket_path, &short_path)?;
// Delete the symlink as soon as we have connected to it. There's a small chance
// of leaking if the process dies before we remove it, so try to keep that window
// as small as possible.
scopeguard::defer! {
if let Err(err) = std::fs::remove_file(&short_path) {
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
short_path.display(), err);
}
}
tracing::info!(
"created symlink \"{}\" for socket \"{}\", opening it now",
short_path.display(),
socket_path.display()
);
tokio::net::UnixStream::connect(&short_path).await
};
let stream = connect_result.context("connecting to communicator control socket")?;
let io = TokioIo::new(stream);
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(err) = connection.await {
eprintln!("Error in connection: {err}");
}
});
Ok(request_sender)
}

File diff suppressed because it is too large Load Diff

View File

@@ -5,10 +5,8 @@ use compute_api::responses::LfcOffloadState;
use compute_api::responses::LfcPrewarmState;
use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
@@ -71,7 +69,7 @@ impl ComputeNode {
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
@@ -90,38 +88,28 @@ impl ComputeNode {
self.state.lock().unwrap().lfc_offload_state.clone()
}
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
let token: CancellationToken;
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap();
token = state.lfc_prewarm_token.clone();
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming =
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
std::mem::replace(state, LfcPrewarmState::Prewarming)
{
return false;
}
}
crate::metrics::LFC_PREWARMS.inc();
let this = self.clone();
let cloned = self.clone();
spawn(async move {
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
let error = format!("{err:#}");
LfcPrewarmState::Failed { error }
}
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
let state = &mut this.state.lock().unwrap();
if let LfcPrewarmState::Cancelled = prewarm_state {
state.lfc_prewarm_token = CancellationToken::new();
}
state.lfc_prewarm_state = prewarm_state;
});
true
}
@@ -132,151 +120,89 @@ impl ComputeNode {
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
token: CancellationToken,
) -> Result<LfcPrewarmState> {
let EndpointStoragePair {
url,
token: storage_token,
} = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
response = request.send() => response
}
.context("querying endpoint storage")?;
match response.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"),
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
if status != StatusCode::OK {
bail!("{status} querying endpoint storage")
}
let mut uncompressed = Vec::new();
let lfc_state = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
lfc_state = response.bytes() => lfc_state
}
.context("getting request body from endpoint storage")?;
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
// Client connection and prewarm info querying are fast and therefore don't need
// cancellation
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let lfc_state = res
.bytes()
.await
.context("connecting to postgres")?;
let pg_token = client.cancel_token();
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
select! {
res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
_ = token.cancelled() => {
pg_token.cancel_query(postgres::NoTls).await
.context("cancelling neon.prewarm_local_cache()")?;
return Ok(LfcPrewarmState::Cancelled)
}
}
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(LfcPrewarmState::Completed)
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
}
/// If offload request is ongoing, return false, true otherwise
/// Returns false if there is an offload request ongoing, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
if let LfcOffloadState::Offloading =
std::mem::replace(state, LfcOffloadState::Offloading)
{
return false;
}
}
let cloned = self.clone();
spawn(async move { cloned.offload_lfc_with_state_update().await });
spawn(async move {
let Err(err) = cloned.offload_lfc_impl().await else {
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
});
true
}
pub async fn offload_lfc_async(self: &Arc<Self>) {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
return;
}
}
self.offload_lfc_with_state_update().await
}
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let state = match self.offload_lfc_impl().await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC");
let error = format!("{err:#}");
LfcOffloadState::Failed { error }
}
};
self.state.lock().unwrap().lfc_offload_state = state;
}
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.await
.context("querying LFC state")?;
let state = row
.try_get::<usize, Option<&[u8]>>(0)
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
};
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();
ZstdEncoder::new(state)
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
),
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
pub fn cancel_prewarm(self: &Arc<Self>) {
self.state.lock().unwrap().lfc_prewarm_token.cancel();
}
}

View File

@@ -1,166 +0,0 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) });
rx
};
let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
cfg.spec.safekeeper_connstrings.join(",")
);
client
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false");
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
.context("getting transaction_read_only")?;
if row.get::<usize, &str>(0) == "on" {
bail!("replica in read only mode after promotion");
}
{
let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()
}
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
}
}

View File

@@ -7,19 +7,13 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{
ComputeAudit, ComputeMode, ComputeSpec, DatabricksSettings, GenericOption,
};
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::compute::ComputeNodeParams;
use crate::pg_helpers::{
DatabricksSettingsExt as _, GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize,
escape_conf_value,
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
use utils::shard::{ShardIndex, ShardNumber};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
@@ -45,16 +39,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
#[allow(clippy::too_many_arguments)]
pub fn write_postgres_conf(
pgdata_path: &Path,
params: &ComputeNodeParams,
spec: &ComputeSpec,
postgres_port: Option<u16>,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
databricks_settings: Option<&DatabricksSettings>,
lakebase_mode: bool,
) -> Result<()> {
let path = pgdata_path.join("postgresql.conf");
// File::create() destroys the file content if it exists.
@@ -65,53 +54,34 @@ pub fn write_postgres_conf(
writeln!(file, "{conf}")?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
writeln!(file)?;
if let Some(conninfo) = &spec.pageserver_connection_info {
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let num_shards = if conninfo.shard_count.0 == 0 {
1 // unsharded, treat it as a single shard
} else {
conninfo.shard_count.0
};
let mut grpc_urls: Option<Vec<String>> = Some(Vec::new());
for shard_number in 0..num_shards {
let shard_index = ShardIndex {
shard_number: ShardNumber(shard_number),
shard_count: conninfo.shard_count,
};
let info = conninfo.shards.get(&shard_index).ok_or_else(|| {
anyhow::anyhow!(
"shard {shard_index} missing from pageserver_connection_info shard map"
)
for shardno in 0..conninfo.shards.len() {
let info = conninfo.shards.get(&(shardno as u32)).ok_or_else(|| {
anyhow::anyhow!("shard {shardno} missing from pageserver_connection_info shard map")
})?;
let first_pageserver = info
.pageservers
.first()
.expect("must have at least one pageserver");
// Add the libpq URL to the array, or if the URL is missing, reset the array
// forgetting any previous entries. All servers must have a libpq URL, or none
// at all.
if let Some(url) = &first_pageserver.libpq_url {
if let Some(url) = &info.libpq_url {
if let Some(ref mut urls) = libpq_urls {
urls.push(url.clone());
}
} else {
libpq_urls = None
}
if let Some(url) = &info.grpc_url {
if let Some(ref mut urls) = grpc_urls {
urls.push(url.clone());
}
} else {
grpc_urls = None
}
}
if let Some(libpq_urls) = libpq_urls {
writeln!(
file,
"# derived from compute spec's pageserver_conninfo field"
)?;
writeln!(
file,
"neon.pageserver_connstring={}",
@@ -120,26 +90,20 @@ pub fn write_postgres_conf(
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
if let Some(stripe_size) = conninfo.stripe_size {
if let Some(grpc_urls) = grpc_urls {
writeln!(
file,
"# from compute spec's pageserver_conninfo.stripe_size field"
"neon.pageserver_grpc_urls={}",
escape_conf_value(&grpc_urls.join(","))
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
} else {
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "# from compute spec's pageserver_connstring field")?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "# from compute spec's shard_stripe_size field")?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
} else {
writeln!(file, "# no neon.pageserver_grpc_urls")?;
}
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
@@ -239,12 +203,6 @@ pub fn write_postgres_conf(
}
}
writeln!(
file,
"neon.privileged_role_name={}",
escape_conf_value(params.privileged_role_name.as_str())
)?;
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;
@@ -360,24 +318,6 @@ pub fn write_postgres_conf(
writeln!(file, "log_destination='stderr,syslog'")?;
}
if lakebase_mode {
// Explicitly set the port based on the connstr, overriding any previous port setting.
// Note: It is important that we don't specify a different port again after this.
let port = postgres_port.expect("port must be present in connstr");
writeln!(file, "port = {port}")?;
// This is databricks specific settings.
// This should be at the end of the file but before `compute_ctl_temp_override.conf` below
// so that it can override any settings above.
// `compute_ctl_temp_override.conf` is intended to override any settings above during specific operations.
// To prevent potential breakage in the future, we keep it above `compute_ctl_temp_override.conf`.
writeln!(file, "# Databricks settings start")?;
if let Some(settings) = databricks_settings {
writeln!(file, "{}", settings.as_pg_settings())?;
}
writeln!(file, "# Databricks settings end")?;
}
// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;

View File

@@ -10,13 +10,7 @@ input(type="imfile" File="{log_directory}/*.log"
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
# the directory to store rsyslog state files
global(
workDirectory="/var/log/rsyslog"
DefaultNetstreamDriverCAFile="/etc/ssl/certs/ca-certificates.crt"
)
# Whether the remote syslog receiver uses tls
set $.remote_syslog_tls = "{remote_syslog_tls}";
global(workDirectory="/var/log/rsyslog")
# Construct json, endpoint_id and project_id as additional metadata
set $.json_log!endpoint_id = "{endpoint_id}";
@@ -27,29 +21,5 @@ set $.json_log!msg = $msg;
template(name="PgAuditLog" type="string"
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
# Forward to remote syslog receiver (over TLS)
if ( $syslogtag == 'pgaudit_log' ) then {{
if ( $.remote_syslog_tls == 'true' ) then {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
StreamDriver="gtls"
StreamDriverMode="1"
StreamDriverAuthMode="x509/name"
StreamDriverPermittedPeers="{remote_syslog_host}"
StreamDriver.CheckExtendedKeyPurpose="on"
StreamDriver.PermitExpiredCerts="off"
)
stop
}} else {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
)
stop
}}
}}
# Forward to remote syslog receiver (@@<hostname>:<port>;format
local5.info @@{remote_endpoint};PgAuditLog

View File

@@ -1,40 +1,23 @@
use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::{path::Path, sync::Arc};
use anyhow::Result;
use compute_api::responses::{ComputeConfig, ComputeStatus};
use compute_api::responses::ComputeStatus;
use tracing::{error, info, instrument};
use crate::compute::{ComputeNode, ParsedSpec};
use crate::spec::get_config_from_control_plane;
use crate::compute::ComputeNode;
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let mut state = compute.state.lock().unwrap();
/* BEGIN_HADRON */
// RefreshConfiguration should only be used inside the loop
assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
/* END_HADRON */
if compute.params.lakebase_mode {
while state.status != ComputeStatus::ConfigurationPending
&& state.status != ComputeStatus::RefreshConfigurationPending
&& state.status != ComputeStatus::Failed
{
info!("configurator: compute status: {:?}, sleeping", state.status);
state = compute.state_changed.wait(state).unwrap();
}
} else {
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// Re-check the status after waking up
@@ -54,136 +37,6 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::RefreshConfigurationPending {
info!(
"compute node suspects its configuration is out of date, now refreshing configuration"
);
state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
// Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
// This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
// is safe to drop the lock like this.
drop(state);
let get_config_result: anyhow::Result<ComputeConfig> =
if let Some(config_path) = &compute.params.config_path_test_only {
// This path is only to make testing easier. In production we always get the config from the HCC.
info!(
"reloading config.json from path: {}",
config_path.to_string_lossy()
);
let path = Path::new(config_path);
if let Ok(file) = File::open(path) {
match serde_json::from_reader::<File, ComputeConfig>(file) {
Ok(config) => Ok(config),
Err(e) => {
error!("could not parse config file: {}", e);
Err(anyhow::anyhow!("could not parse config file: {}", e))
}
}
} else {
error!(
"could not open config file at path: {:?}",
config_path.to_string_lossy()
);
Err(anyhow::anyhow!(
"could not open config file at path: {}",
config_path.to_string_lossy()
))
}
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
} else {
Err(anyhow::anyhow!("config_path_test_only is not set"))
};
// Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
let parsed_spec_result: Result<Option<ParsedSpec>> =
get_config_result.and_then(|config| {
if let Some(spec) = config.spec {
if let Ok(pspec) = ParsedSpec::try_from(spec) {
Ok(Some(pspec))
} else {
Err(anyhow::anyhow!("could not parse spec"))
}
} else {
Ok(None)
}
});
let new_status: ComputeStatus;
match parsed_spec_result {
// Control plane (HCM) returned a spec and we were able to parse it.
Ok(Some(pspec)) => {
{
let mut state = compute.state.lock().unwrap();
// Defensive programming to make sure this thread is indeed the only one that can move the compute
// node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
// into the type system.
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
if state
.pspec
.as_ref()
.map(|ps| ps.pageserver_conninfo.clone())
== Some(pspec.pageserver_conninfo.clone())
{
info!(
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
);
state.status = ComputeStatus::Running;
compute.state_changed.notify_all();
drop(state);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
// but it's not worth forking the codebase too much for this minor point alone right now.
state.pspec = Some(pspec);
}
match compute.reconfigure() {
Ok(_) => {
info!("Refresh configuration: compute node configured");
new_status = ComputeStatus::Running;
}
Err(e) => {
error!(
"Refresh configuration: could not configure compute node: {}",
e
);
// Set the compute node back to the `RefreshConfigurationPending` state if the configuration
// was not successful. It should be okay to treat this situation the same as if the loop
// hasn't executed yet as long as the detection side keeps notifying.
new_status = ComputeStatus::RefreshConfigurationPending;
}
}
}
// Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
Ok(None) => {
info!(
"Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
);
// We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
// clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
// configuration state).
std::process::exit(1);
}
// Various error cases:
// - The request to the control plane (HCM) either failed or returned a malformed spec.
// - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
Err(e) => {
error!(
"Refresh configuration: error getting a parsed spec: {:?}",
e
);
new_status = ComputeStatus::RefreshConfigurationPending;
// We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
// retrying to avoid hammering the HCM.
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");

View File

@@ -1,60 +0,0 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,29 +16,13 @@ use crate::http::JsonResponse;
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
// BEGIN HADRON
// Hadron instance ID. Only set if it's a Lakebase V1 a.k.a. Hadron instance.
instance_id: Option<String>,
// END HADRON
jwks: JwkSet,
validation: Validation,
}
impl Authorize {
pub fn new(compute_id: String, instance_id: Option<String>, jwks: JwkSet) -> Self {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// BEGIN HADRON
let use_rsa = jwks.keys.iter().any(|jwk| {
jwk.common
.key_algorithm
.is_some_and(|alg| alg == jsonwebtoken::jwk::KeyAlgorithm::RS256)
});
if use_rsa {
validation = Validation::new(Algorithm::RS256);
}
// END HADRON
validation.validate_exp = true;
// Unused by the control plane
validation.validate_nbf = false;
@@ -50,7 +34,6 @@ impl Authorize {
Self {
compute_id,
instance_id,
jwks,
validation,
}
@@ -64,20 +47,10 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
let compute_id = self.compute_id.clone();
let is_hadron_instance = self.instance_id.is_some();
let jwks = self.jwks.clone();
let validation = self.validation.clone();
Box::pin(async move {
// BEGIN HADRON
// In Hadron deployments the "external" HTTP endpoint on compute_ctl can only be
// accessed by trusted components (enforced by dblet network policy), so we can bypass
// all auth here.
if is_hadron_instance {
return Ok(request);
}
// END HADRON
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await

View File

@@ -83,96 +83,6 @@ paths:
schema:
$ref: "#/components/schemas/DbsAndRoles"
/promote:
post:
tags:
- Promotion
summary: Promote secondary replica to primary
description: ""
operationId: promoteReplica
requestBody:
description: Promote requests data
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchemaWithLsn"
responses:
200:
description: Promote succeeded or wasn't started
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
500:
description: Promote failed
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
/lfc/prewarm:
post:
summary: Request LFC Prewarm
parameters:
- name: from_endpoint
in: query
schema:
type: string
description: ""
operationId: lfcPrewarm
responses:
202:
description: LFC prewarm started
429:
description: LFC prewarm ongoing
get:
tags:
- Prewarm
summary: Get LFC prewarm state
description: ""
operationId: getLfcPrewarmState
responses:
200:
description: Prewarm state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
delete:
tags:
- Prewarm
summary: Cancel ongoing LFC prewarm
description: ""
operationId: cancelLfcPrewarm
responses:
202:
description: Prewarm cancelled
/lfc/offload:
post:
summary: Request LFC offload
description: ""
operationId: lfcOffload
responses:
202:
description: LFC offload started
429:
description: LFC offload ongoing
get:
tags:
- Prewarm
summary: Get LFC offloading state
description: ""
operationId: getLfcOffloadState
responses:
200:
description: Offload state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcOffloadState"
/database_schema:
get:
tags:
@@ -306,7 +216,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchema"
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
@@ -373,28 +290,9 @@ paths:
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
parameters:
- name: mode
in: query
description: "Terminate mode: fast (wait 30s before returning) and immediate"
required: false
schema:
type: string
enum: ["fast", "immediate"]
default: fast
responses:
200:
description: Result
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
201:
description: Result if compute is already terminated
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
412:
description: "wrong state"
content:
@@ -437,6 +335,15 @@ components:
total_startup_ms:
type: integer
Info:
type: object
description: Information about VM/Pod.
required:
- num_cpus
properties:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
@@ -551,14 +458,11 @@ components:
type: string
enum:
- empty
- configuration_pending
- init
- running
- configuration
- failed
- termination_pending_fast
- termination_pending_immediate
- terminated
- running
- configuration_pending
- configuration
example: running
ExtensionInstallRequest:
@@ -593,76 +497,25 @@ components:
type: string
example: "1.0.0"
ComputeSchema:
InstalledExtensions:
type: object
required:
- spec
properties:
spec:
type: object
ComputeSchemaWithLsn:
type: object
required:
- spec
- wal_flush_lsn
properties:
spec:
$ref: "#/components/schemas/ComputeState"
wal_flush_lsn:
type: string
description: "last WAL flush LSN"
example: "0/028F10D8"
LfcPrewarmState:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: LFC prewarm status
enum: [not_prewarmed, prewarming, completed, failed, skipped]
type: string
error:
description: LFC prewarm error, if any
type: string
total:
description: Total pages processed
type: integer
prewarmed:
description: Total pages prewarmed
type: integer
skipped:
description: Pages processed but not prewarmed
type: integer
LfcOffloadState:
type: object
required:
- status
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, skipped, failed]
type: string
error:
description: LFC offload error, if any
type: string
PromoteState:
type: object
required:
- status
properties:
status:
description: Promote result
enum: [not_promoted, completed, failed]
type: string
error:
description: Promote error, if any
type: string
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object
@@ -691,17 +544,6 @@ components:
description: Role name.
example: "neon"
TerminateResponse:
type: object
required:
- lsn
properties:
lsn:
type: string
nullable: true
description: "last WAL flush LSN"
example: "0/028F10D8"
SetRoleGrantsResponse:
type: object
required:

View File

@@ -43,12 +43,7 @@ pub(in crate::http) async fn configure(
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
if compute.params.lakebase_mode {
ComputeNode::set_spec(&compute.params, &mut state, pspec);
} else {
state.pspec = Some(pspec);
}
state.pspec = Some(pspec);
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
drop(state);
}

View File

@@ -1,34 +0,0 @@
use crate::pg_isready::pg_isready;
use crate::{compute::ComputeNode, http::JsonResponse};
use axum::{extract::State, http::StatusCode, response::Response};
use std::sync::Arc;
/// NOTE: NOT ENABLED YET
/// Detect if the compute is alive.
/// Called by the liveness probe of the compute container.
pub(in crate::http) async fn hadron_liveness_probe(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
let port = match compute.params.connstr.port() {
Some(port) => port,
None => {
return JsonResponse::error(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get the port from the connection string",
);
}
};
match pg_isready(&compute.params.pg_isready_bin, port) {
Ok(_) => {
// The connection is successful, so the compute is alive.
// Return a 200 OK response.
JsonResponse::success(StatusCode::OK, "ok")
}
Err(e) => {
tracing::error!("Hadron liveness probe failed: {}", e);
// The connection failed, so the compute is not alive.
// Return a 500 Internal Server Error response.
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -46,8 +46,3 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
)
}
}
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
compute.cancel_prewarm();
StatusCode::ACCEPTED
}

View File

@@ -1,19 +1,10 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::communicator_socket_client::connect_communicator_socket;
use crate::compute::ComputeNode;
use crate::hadron_metrics;
use crate::http::JsonResponse;
use crate::metrics::collect;
@@ -22,18 +13,11 @@ pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let mut metrics = collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
// Add Hadron metrics.
let hadron_metrics: Vec<MetricFamily> = hadron_metrics::collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect();
metrics.extend(hadron_metrics);
let encoder = TextEncoder::new();
let mut buffer = vec![];
@@ -47,42 +31,3 @@ pub(in crate::http) async fn get_metrics() -> Response {
.body(Body::from(buffer))
.unwrap()
}
/// Fetch and forward metrics from the Postgres neon extension's metrics
/// exporter that are used by autoscaling-agent.
///
/// The neon extension exposes these metrics over a Unix domain socket
/// in the data directory. That's not accessible directly from the outside
/// world, so we have this endpoint in compute_ctl to expose it
pub(in crate::http) async fn get_autoscaling_metrics(
State(compute): State<Arc<ComputeNode>>,
) -> Result<Response, Response> {
let pgdata = Path::new(&compute.params.pgdata);
// Connect to the communicator process's metrics socket
let mut metrics_client = connect_communicator_socket(pgdata)
.await
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Make a request for /autoscaling_metrics
let request = Request::builder()
.method("GET")
.uri("/autoscaling_metrics")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(Body::from(""))
.unwrap();
let resp = metrics_client
.send_request(request)
.await
.context("fetching metrics from Postgres metrics service")
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Build a response that just forwards the response we got.
let mut response = Response::builder();
response = response.status(resp.status());
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
response = response.header(CONTENT_TYPE, content_type);
}
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
Ok(response.body(body).unwrap())
}

View File

@@ -10,13 +10,10 @@ pub(in crate::http) mod extension_server;
pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod hadron_liveness_probe;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod refresh_configuration;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,14 +0,0 @@
use crate::http::JsonResponse;
use axum::extract::Json;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
) -> axum::response::Response {
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -1,29 +0,0 @@
// This file is added by Hadron
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use http::StatusCode;
use crate::compute::ComputeNode;
use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse;
/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
/// configuration to be reloaded in a best effort manner. Invocation of this method does not
/// guarantee that a reconfiguration will occur. The caller should consider keep sending this
/// request while it believes that the compute configuration is out of date.
pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
}
}

View File

@@ -1,9 +1,9 @@
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
@@ -12,7 +12,7 @@ use tracing::info;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: TerminateMode,
mode: compute_api::responses::TerminateMode,
}
/// Terminate the compute.
@@ -24,38 +24,16 @@ pub(in crate::http) async fn terminate(
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
let response = TerminateResponse {
lsn: state.terminate_flush_lsn,
};
return JsonResponse::success(StatusCode::CREATED, response);
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
// If compute is Empty, there's no Postgres to terminate. The regular compute_ctl termination path
// assumes Postgres to be configured and running, so we just special-handle this case by exiting
// the process directly.
if compute.params.lakebase_mode && state.status == ComputeStatus::Empty {
drop(state);
info!("terminating empty compute - will exit process");
// Queue a task to exit the process after 5 seconds. The 5-second delay aims to
// give enough time for the HTTP response to be sent so that HCM doesn't get an abrupt
// connection termination.
tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
info!("exiting process after terminating empty compute");
std::process::exit(0);
});
return StatusCode::OK.into_response();
}
// For Running status, proceed with normal termination
state.set_status(mode.into(), &compute.state_changed);
drop(state);
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
}
forward_termination_signal(false);

View File

@@ -23,8 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote,
refresh_configuration, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -44,7 +43,6 @@ pub enum Server {
port: u16,
config: ComputeCtlConfig,
compute_id: String,
instance_id: Option<String>,
},
}
@@ -69,12 +67,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant))
// Hadron: Compute-initiated configuration refresh
.route(
"/refresh_configuration",
post(refresh_configuration::refresh_configuration),
);
.route("/grants", post(grants::add_grant));
// Add in any testing support
if cfg!(feature = "testing") {
@@ -86,27 +79,14 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
router
}
Server::External {
config,
compute_id,
instance_id,
..
config, compute_id, ..
} => {
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/autoscaling_metrics",
get(metrics::get_autoscaling_metrics),
);
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route(
"/lfc/prewarm",
get(lfc::prewarm_state)
.post(lfc::prewarm)
.delete(lfc::cancel_prewarm),
)
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
@@ -115,13 +95,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.route(
"/hadron_liveness_probe",
get(hadron_liveness_probe::hadron_liveness_probe),
)
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
instance_id.clone(),
config.jwks.clone(),
)));

View File

@@ -2,8 +2,6 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use once_cell::sync::Lazy;
use tokio_postgres::error::Error as PostgresError;
use tokio_postgres::{Client, Config, NoTls};
use crate::metrics::INSTALLED_EXTENSIONS;
@@ -12,14 +10,14 @@ use crate::metrics::INSTALLED_EXTENSIONS;
/// and to make database listing query here more explicit.
///
/// Limit the number of databases to 500 to avoid excessive load.
async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
async fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state
let databases = client
.query(
"SELECT datname FROM pg_catalog.pg_database
WHERE datallowconn
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
AND datconnlimit <> - 2
LIMIT 500",
&[],
)
@@ -39,9 +37,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
/// Same extension can be installed in multiple databases with different versions,
/// so we report a separate metric (number of databases where it is installed)
/// for each extension version.
pub async fn get_installed_extensions(
mut conf: Config,
) -> Result<InstalledExtensions, PostgresError> {
pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let databases: Vec<String> = {
let (mut client, connection) = conf.connect(NoTls).await?;
@@ -67,7 +63,7 @@ pub async fn get_installed_extensions(
let extensions: Vec<(String, String, i32)> = client
.query(
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
&[],
)
.await?
@@ -120,7 +116,3 @@ pub async fn get_installed_extensions(
extensions: extensions_map.into_values().collect(),
})
}
pub fn initialize_metrics() {
Lazy::force(&INSTALLED_EXTENSIONS);
}

View File

@@ -4,7 +4,6 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checker;
pub mod communicator_socket_client;
pub mod config;
pub mod configurator;
pub mod http;
@@ -13,10 +12,8 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
@@ -25,7 +22,6 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pg_isready;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;

View File

@@ -1,10 +1,7 @@
use std::collections::HashMap;
use std::sync::{LazyLock, RwLock};
use tracing::Subscriber;
use tracing::info;
use tracing_appender;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
///
@@ -16,63 +13,31 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// `tracing-utils` package description.
///
pub fn init_tracing_and_logging(
default_log_level: &str,
log_dir_opt: &Option<String>,
) -> anyhow::Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
// Standard output streams
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(std::io::stderr);
// Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
std::fs::create_dir_all(log_dir)?;
let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("pgcctl")
// Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
// At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
.max_log_files(2)
.build(log_dir)
.expect("Initializing rolling file appender should succeed");
let (file_logs_writer, _file_logs_guard) =
tracing_appender::non_blocking(file_logs_appender);
let json_to_file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(file_logs_writer);
(Some(json_to_file_layer), Some(_file_logs_guard))
} else {
(None, None)
};
// Initialize OpenTelemetry
let provider =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.with(json_to_file_layer)
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok((provider, _file_logs_guard))
Ok(())
}
/// Replace all newline characters with a special character to make it
@@ -127,157 +92,3 @@ pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
None
}
}
/// Track relevant id's
const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
let ids = format!(
r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
);
let mut guard = IDS
.write()
.map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
*guard = ids;
Ok(())
}
/// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
struct PgJsonLogShapeFormatter;
impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> fmt::format::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &fmt::FmtContext<'_, S, N>,
mut writer: fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
// Format values from the event's metadata, and open message string
let metadata = event.metadata();
{
let ids_guard = IDS.read();
let ids = ids_guard
.as_ref()
.map(|guard| guard.as_str())
// Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
.unwrap_or(UNKNOWN_IDS);
write!(
&mut writer,
r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
metadata.level(),
metadata.target(),
ids
)?;
}
let mut message = String::new();
let message_writer = fmt::format::Writer::new(&mut message);
// Gather the message
ctx.field_format().format_fields(message_writer, event)?;
// TODO: any better options than to copy-paste this OSS span formatter?
// impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
// write message, close bracket, and new line
writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
}
}
#[cfg(feature = "testing")]
#[cfg(test)]
mod test {
use super::*;
use std::{cell::RefCell, io};
// Use thread_local! instead of Mutex for test isolation
thread_local! {
static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
}
#[derive(Clone, Default)]
struct StaticStringWriter;
impl io::Write for StaticStringWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl fmt::MakeWriter<'_> for StaticStringWriter {
type Writer = Self;
fn make_writer(&self) -> Self::Writer {
Self
}
}
#[test]
fn test_log_pg_json_shape_formatter() {
// Use a scoped subscriber to prevent global state pollution
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(StaticStringWriter),
);
let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
// Clear any previous test state
WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
let messages = [
"test message",
r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
];
tracing::subscriber::with_default(subscriber, || {
for message in messages {
tracing::info!(message);
}
});
tracing::info!("not test message");
// Get captured output
let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
let json_strings: Vec<&str> = output.lines().collect();
assert_eq!(
json_strings.len(),
messages.len(),
"Log didn't have the expected number of json strings."
);
let json_string_shape_regex = regex::Regex::new(
r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
).unwrap();
for (i, expected_message) in messages.iter().enumerate() {
let json_string = json_strings[i];
assert!(
json_string_shape_regex.is_match(json_string),
"Json log didn't match expected pattern:\n{json_string}",
);
let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
let actual_message = parsed_json["message"].as_str().unwrap();
assert_eq!(*expected_message, actual_message);
}
}
}

View File

@@ -4,13 +4,13 @@ use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo};
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use crate::compute::ComputeNode;
@@ -116,38 +116,37 @@ fn try_acquire_lsn_lease(
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let shard_count = conninfo.shards.len();
let mut leases = Vec::new();
for (shard_index, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number: shard_index.shard_number,
shard_count: shard_index.shard_count,
for (shard_number, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
// XXX: If there are more than pageserver for the one shard, do we need to get a
// leas on all of them? Currently, that's what we assume, but this is hypothetical
// as of this writing, as we never pass the info for more than one pageserver per
// shard.
for pageserver in shard.pageservers {
let lease = match conninfo.prefer_protocol {
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
&pageserver.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
&pageserver.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?,
};
leases.push(lease);
}
let lease = if conninfo.prefer_grpc {
acquire_lsn_lease_grpc(
&shard.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?
} else {
acquire_lsn_lease_libpq(
&shard.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?
};
leases.push(lease);
}
Ok(leases.into_iter().min().flatten())

View File

@@ -97,34 +97,20 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
.expect("failed to define a metric")
});
pub(crate) static LFC_PREWARMS: Lazy<IntCounter> = Lazy::new(|| {
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarms_total",
"Total number of LFC prewarms requested by compute_ctl or autoprewarm option",
"compute_ctl_lfc_prewarm_requests_total",
"Total number of LFC prewarm requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_PREWARM_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_errors_total",
"Total number of LFC prewarm errors",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offloads_total",
"Total number of LFC offloads requested by compute_ctl or lfc_offload_period_seconds option",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_errors_total",
"Total number of LFC offload errors",
"compute_ctl_lfc_offload_requests_total",
"Total number of LFC offload requests made by compute_ctl",
)
.expect("failed to define a metric")
});
@@ -138,9 +124,7 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARMS.collect());
metrics.extend(LFC_PREWARM_ERRORS.collect());
metrics.extend(LFC_OFFLOADS.collect());
metrics.extend(LFC_OFFLOAD_ERRORS.collect());
metrics.extend(LFC_PREWARM_REQUESTS.collect());
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
metrics
}

View File

@@ -9,20 +9,15 @@ use crate::metrics::DB_MIGRATION_FAILED;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
lakebase_mode: bool,
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Self {
client,
migrations,
lakebase_mode,
}
Self { client, migrations }
}
/// Get the current value neon_migration.migration_id
@@ -76,7 +71,7 @@ impl<'m> MigrationRunner<'m> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
@@ -135,13 +130,8 @@ impl<'m> MigrationRunner<'m> {
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let migration = if self.lakebase_mode {
migration.replace("neon_superuser", "databricks_superuser")
} else {
migration.to_string()
};
match Self::run_migration(self.client, migration_id, &migration).await {
match Self::run_migration(self.client, migration_id, migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -1 +0,0 @@
ALTER ROLE {privileged_role_name} BYPASSRLS;

Some files were not shown because too many files have changed in this diff Show More