Compare commits

...

20 Commits

Author SHA1 Message Date
Tristan Partin
0f5e118789 Add support for pgindent to GitHub Actions and pre-commit.py
Gate PRs on whether or not pgindent passes for C files in pgxn.
2025-07-22 10:07:41 -05:00
Heikki Linnakangas
8bb45fd5da Introduce built-in Prometheus exporter to the Postgres extension (#12591)
Currently, the exporter exposes the same LFC metrics that are exposed by
the "autoscaling" sql_exporter in the docker image. With this, we can
remove the dedicated sql_exporter instance. (Actually doing the removal
is left as a TODO until this is rolled out to production and we have
changed autoscaling-agent to fetch the metrics from this new endpoint.)

The exporter runs as a Postgres background worker process. This is
extracted from the Rust communicator rewrite project, which will use the
same worker process for much more, to handle the communications with the
pageservers. For now, though, it merely handles the metrics requests.

In the future, we will add more metrics, and perhaps even APIs to
control the running Postgres instance.

The exporter listens on a Unix Domain socket within the Postgres data
directory. A Unix Domain socket is a bit unconventional, but it has some
advantages:

- Permissions are taken care of. Only processes that can access the data
directory, and therefore already have full access to the running
Postgres instance, can connect to it.

- No need to allocate and manage a new port number for the listener

It has some downsides too: it's not immediately accessible from the
outside world, and the functions to work with Unix Domain sockets are
more low-level than TCP sockets (see the symlink hack in
`postgres_metrics_client.rs`, for example).

To expose the metrics from the local Unix Domain Socket to the
autoscaling agent, introduce a new '/autoscaling_metrics' endpoint in
the compute_ctl's HTTP server. Currently it merely forwards the request
to the Postgres instance, but we could add rate limiting and access
control there in the future.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-07-22 12:00:20 +00:00
Vlad Lazar
88bc06f148 communicator: debug log more fields of the get page response (#12644)
It's helpful to correlate requests and responses in local investigations
where the issue is reproducible. Hence, log the rel, fork and block of
the get page response.
2025-07-22 11:25:11 +00:00
Vlad Lazar
d91d018afa storcon: handle pageserver disk loss (#12667)
NB: effectively a no-op in the neon env since the handling is config
gated
in storcon

## Problem

When a pageserver suffers from a local disk/node failure and restarts,
the storage controller will receive a re-attach call and return all the
tenants the pageserver is suppose to attach, but the pageserver will not
act on any tenants that it doesn't know about locally. As a result, the
pageserver will not rehydrate any tenants from remote storage if it
restarted following a local disk loss, while the storage controller
still thinks that the pageserver have all the tenants attached. This
leaves the system in a bad state, and the symptom is that PG's
pageserver connections will fail with "tenant not found" errors.

## Summary of changes

Made a slight change to the storage controller's `re_attach` API:
* The pageserver will set an additional bit `empty_local_disk` in the
reattach request, indicating whether it has started with an empty disk
or does not know about any tenants.
* Upon receiving the reattach request, if this `empty_local_disk` bit is
set, the storage controller will go ahead and clear all observed
locations referencing the pageserver. The reconciler will then discover
the discrepancy between the intended state and observed state of the
tenant and take care of the situation.

To facilitate rollouts this extra behavior in the `re_attach` API is
guarded by the `handle_ps_local_disk_loss` command line flag of the
storage controller.

---------

Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-22 11:04:03 +00:00
Folke Behrens
9c0efba91e Bump rand crate to 0.9 (#12674) 2025-07-22 09:31:39 +00:00
Konstantin Knizhnik
5464552020 Limit number of parallel config apply connections to 100 (#12663)
## Problem

See https://databricks.slack.com/archives/C092W8NBXC0/p1752924508578339

In case of larger number of databases and large `max_connections` we can
open too many connection for parallel apply config which may cause `Too
many open files` error.

## Summary of changes

Limit maximal number of parallel config apply connections by 100.

---------

Co-authored-by: Kosntantin Knizhnik <konstantin.knizhnik@databricks.com>
2025-07-22 04:39:54 +00:00
Arpad Müller
80baeaa084 storcon: add force_upsert flag to timeline_import endpoint (#12622)
It is useful to have ability to update an existing timeline entry, as a
way to mirror legacy migrations to the storcon managed table.
2025-07-21 21:14:15 +00:00
Tristan Partin
b7bc3ce61e Skip PG throttle during configuration (#12670)
## Problem

While running tenant split tests I ran into a situation where PG got
stuck completely. This seems to be a general problem that was not found
in the previous chaos testing fixes.

What happened is that if PG gets throttled by PS, and SC decided to move
some tenant away, then PG reconfiguration could be blocked forever
because it cannot talk to the old PS anymore to refresh the throttling
stats, and reconfiguration cannot proceed because it's being throttled.
Neon has considered the case that configuration could be blocked if the
PG storage is full, but forgot the backpressure case.

## Summary of changes
The PR fixes this problem by simply skipping throttling while PS is
being configured, i.e., `max_cluster_size < 0`. An alternative fix is to
set those throttle knobs to -1 (e.g., max_replication_apply_lag),
however these knobs were labeled with PGC_POSTMASTER so their values
cannot be changed unless we restart PG.

## How is this tested?
Tested manually.

Co-authored-by: Chen Luo <chen.luo@databricks.com>
2025-07-21 20:50:02 +00:00
Ivan Efremov
050c9f704f proxy: expose session_id to clients and proxy latency to probes (#12656)
Implements #8728
2025-07-21 20:27:15 +00:00
Ruslan Talpa
0dbe551802 proxy: subzero integration in auth-broker (embedded data-api) (#12474)
## Problem
We want to have the data-api served by the proxy directly instead of
relying on a 3rd party to run a deployment for each project/endpoint.

## Summary of changes
With the changes below, the proxy (auth-broker) becomes also a
"rest-broker", that can be thought of as a "Multi-tenant" data-api which
provides an automated REST api for all the databases in the region.

The core of the implementation (that leverages the subzero library) is
in proxy/src/serverless/rest.rs and this is the only place that has "new
logic".

---------

Co-authored-by: Ruslan Talpa <ruslan.talpa@databricks.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-07-21 18:16:28 +00:00
Tristan Partin
187170be47 Add max_wal_rate test (#12621)
## Problem
Add a test for max_wal_rate

## Summary of changes
Test max_wal_rate

## How is this tested?
python test

Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com>
2025-07-21 17:58:03 +00:00
Vlad Lazar
30e1213141 pageserver: check env var for ip address before node registration (#12666)
Include the ip address (optionally read from an env var) in the
pageserver's registration request.
Note that the ip address is ignored by the storage controller at the
moment, which makes it a no-op
in the neon env.
2025-07-21 15:32:28 +00:00
Vlad Lazar
25efbcc7f0 safekeeper: parallelise segment copy (#12664)
Parallelise segment copying on the SK. I'm not aware of the neon
deployment using this endpoint.
2025-07-21 14:47:58 +00:00
Conrad Ludgate
b2ecb10f91 [proxy] rework handling of notices in sql-over-http (#12659)
A replacement for #10254 which allows us to introduce notice messages
for sql-over-http in the future if we want to. This also removes the
`ParameterStatus` and `Notification` handling as there's nothing we
could/should do for those.
2025-07-21 12:50:13 +00:00
Erik Grinaker
5a48365fb9 pageserver/client_grpc: don't set stripe size for unsharded tenants (#12639)
## Problem

We've had bugs where the compute would use the stale default stripe size
from an unsharded tenant after the tenant split with a new stripe size.

## Summary of changes

Never specify a stripe size for unsharded tenants, to guard against
misuse. Only specify it once tenants are sharded and the stripe size
can't change.

Also opportunistically changes `GetPageSplitter` to return
`anyhow::Result`, since we'll be using this in other code paths as well
(specifically during server-side shard splits).
2025-07-21 12:28:39 +00:00
Erik Grinaker
194b9ffc41 pageserver: remove gRPC CheckRelExists (#12616)
## Problem

Postgres will often immediately follow a relation existence check with a
relation size query. This incurs two roundtrips, and may prevent
effective caching.

See [Slack
thread](https://databricks.slack.com/archives/C091SDX74SC/p1751951732136139).

Touches #11728.

## Summary of changes

For the gRPC API:

* Add an `allow_missing` parameter to `GetRelSize`, which returns
`missing=true` instead of a `NotFound` error.
* Remove `CheckRelExists`.

There are no changes to libpq behavior.
2025-07-21 11:43:26 +00:00
Dimitri Fontaine
1e30b31fa7 Cherry pick: pg hooks for online table. (#12654)
## Problem

## Summary of changes
2025-07-21 11:10:10 +00:00
Erik Grinaker
e181b996c3 utils: move ShardStripeSize into shard module (#12640)
## Problem

`ShardStripeSize` will be used in the compute spec and internally in the
communicator. It shouldn't require pulling in all of `pageserver_api`.

## Summary of changes

Move `ShardStripeSize` into `utils::shard`, along with other basic shard
types. Also remove the `Default` implementation, to discourage clients
from falling back to a default (it's generally a footgun).

The type is still re-exported from `pageserver_api::shard`, along with
all the other shard types.
2025-07-21 10:56:20 +00:00
Erik Grinaker
1406bdc6a8 pageserver: improve gRPC cancellation (#12635)
## Problem

The gRPC page service does not properly react to shutdown cancellation.
In particular, Tonic considers an open GetPage stream to be an in-flight
request, so it will wait for it to complete before shutting down.

Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191).

## Summary of changes

Properly react to the server's cancellation token and take out gate
guards in gRPC request handlers.

Also document cancellation handling. In particular, that Tonic will drop
futures when clients go away (e.g. on timeout or shutdown), so the read
path must be cancellation-safe. It is believed to be (modulo possible
logging noise), but this will be verified later.
2025-07-21 10:52:18 +00:00
Paul Banks
791b5d736b Fixes #10441: control_plane README incorrect neon init args (#12646)
## Problem

As reported in #10441 the `control_plane/README/md` incorrectly
specified that `--pg-version` should be specified in the `cargo neon
init` command. This is not the case and causes an invalid argument
error.

## Summary of changes

Fix the README

## Test Plan

I verified that the steps in the README now work locally. I connected to
the started postgres endpoint and executed some basic metadata queries.
2025-07-18 17:09:20 +00:00
167 changed files with 4504 additions and 4687 deletions

View File

@@ -21,13 +21,14 @@ platforms = [
# "x86_64-apple-darwin",
# "x86_64-pc-windows-msvc",
]
[final-excludes]
workspace-members = [
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
"vm_monitor",
# subzero-core is a stub crate that should be excluded from workspace-hack
"subzero-core",
# All of these exist in libs and are not usually built independently.
# Putting workspace hack there adds a bottleneck for cargo builds.
"compute_api",

View File

@@ -0,0 +1,28 @@
name: 'Prepare current job for subzero'
description: >
Set git token to access `neondatabase/subzero` from cargo build,
and set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable to use git CLI
inputs:
token:
description: 'GitHub token with access to neondatabase/subzero'
required: true
runs:
using: "composite"
steps:
- name: Set git token for neondatabase/subzero
uses: pyTooling/Actions/with-post-step@2307b526df64d55e95884e072e49aac2a00a9afa # v5.1.0
env:
SUBZERO_ACCESS_TOKEN: ${{ inputs.token }}
with:
main: |
git config --global url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a
post: |
git config --global --unset url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
- name: Set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable
shell: bash -euxo pipefail {0}
run: echo "CARGO_NET_GIT_FETCH_WITH_CLI=true" >> ${GITHUB_ENV}

View File

@@ -86,6 +86,10 @@ jobs:
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
@@ -116,7 +120,7 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FLAGS="--locked --features testing"
CARGO_FLAGS="--locked --features testing,rest_broker"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_PROFILE=""

View File

@@ -46,6 +46,10 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Cache cargo deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0

View File

@@ -54,6 +54,10 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Install build dependencies
run: |

View File

@@ -632,6 +632,8 @@ jobs:
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
DEBIAN_VERSION=bookworm
secrets: |
SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }}
provenance: false
push: true
pull: true

View File

@@ -72,6 +72,7 @@ jobs:
check-macos-build:
needs: [ check-permissions, files-changed ]
uses: ./.github/workflows/build-macos.yml
secrets: inherit
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ fromJSON(needs.files-changed.outputs.rebuild_rust_code) }}

52
.github/workflows/pgindent.yml vendored Normal file
View File

@@ -0,0 +1,52 @@
name: pgindent Neon
on:
push:
branches:
- main
- release
paths:
- 'pgxn/**.[ch]'
- '.github/workflows/pgindent.yml'
pull_request:
paths:
- 'pgxn/**.[ch]'
- '.github/workflows/pgindent.yml'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
jobs:
pgindent:
runs-on: ubuntu-24.04
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Set pg 17 revision for caching
id: pg_v17_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@v3
with:
path: pg_install/v17
key: v1-${{ runner.os }}-release-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Run pgindent
run: |
make -s -j neon-pgindent-check
- name: How to fix
if: ${{ failure() }}
run: |
echo Run \"make neon-pgindent\" in the event of a failure

5
.gitignore vendored
View File

@@ -26,9 +26,14 @@ docker-compose/docker-compose-parallel.yml
*.o
*.so
*.Po
*.pid
# pgindent typedef lists
*.list
# Node
**/node_modules/
# various files for local testing
/proxy/.subzero
local_proxy.json

View File

@@ -19,6 +19,7 @@ ln -s ../../pre-commit.py .git/hooks/pre-commit
```
This will run following checks on staged files before each commit:
- `pgindent` over any Neon Postgres extension files
- `rustfmt`
- checks for Python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).

220
Cargo.lock generated
View File

@@ -52,6 +52,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "aligned-vec"
version = "0.6.1"
@@ -490,7 +496,7 @@ dependencies = [
"hex",
"hmac",
"http 0.2.9",
"http 1.1.0",
"http 1.3.1",
"once_cell",
"p256 0.11.1",
"percent-encoding",
@@ -631,7 +637,7 @@ dependencies = [
"aws-smithy-types",
"bytes",
"http 0.2.9",
"http 1.1.0",
"http 1.3.1",
"pin-project-lite",
"tokio",
"tracing",
@@ -649,7 +655,7 @@ dependencies = [
"bytes-utils",
"futures-core",
"http 0.2.9",
"http 1.1.0",
"http 1.3.1",
"http-body 0.4.5",
"http-body 1.0.0",
"http-body-util",
@@ -698,7 +704,7 @@ dependencies = [
"bytes",
"form_urlencoded",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
@@ -732,7 +738,7 @@ checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
dependencies = [
"bytes",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"mime",
@@ -756,7 +762,7 @@ dependencies = [
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"mime",
@@ -1090,7 +1096,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "975982cdb7ad6a142be15bdf84aea7ec6a9e5d4d797c004d43185b24cfe4e684"
dependencies = [
"clap",
"heck",
"heck 0.5.0",
"indexmap 2.9.0",
"log",
"proc-macro2",
@@ -1228,7 +1234,7 @@ version = "4.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -1290,8 +1296,14 @@ dependencies = [
name = "communicator"
version = "0.1.0"
dependencies = [
"axum",
"cbindgen",
"neon-shmem",
"http 1.3.1",
"measured",
"tokio",
"tracing",
"tracing-subscriber",
"utils",
"workspace_hack",
]
@@ -1334,7 +1346,10 @@ dependencies = [
"flate2",
"futures",
"hostname-validator",
"http 1.1.0",
"http 1.3.1",
"http-body-util",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.10.5",
"jsonwebtoken",
@@ -1357,6 +1372,7 @@ dependencies = [
"ring",
"rlimit",
"rust-ini",
"scopeguard",
"serde",
"serde_json",
"serde_with",
@@ -1445,7 +1461,7 @@ name = "consumption_metrics"
version = "0.1.0"
dependencies = [
"chrono",
"rand 0.8.5",
"rand 0.9.1",
"serde",
]
@@ -1848,7 +1864,7 @@ dependencies = [
"bytes",
"hex",
"parking_lot 0.12.1",
"rand 0.8.5",
"rand 0.9.1",
"smallvec",
"tracing",
"utils",
@@ -1969,7 +1985,7 @@ checksum = "0892a17df262a24294c382f0d5997571006e7a4348b4327557c4ff1cd4a8bccc"
dependencies = [
"darling",
"either",
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -2093,7 +2109,7 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"rand 0.9.1",
"remote_storage",
"serde",
"serde_json",
@@ -2661,7 +2677,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"indexmap 2.9.0",
"slab",
"tokio",
@@ -2743,7 +2759,7 @@ dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 1.1.0",
"http 1.3.1",
"httpdate",
"mime",
"sha1",
@@ -2755,9 +2771,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http 1.1.0",
"http 1.3.1",
]
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heck"
version = "0.5.0"
@@ -2833,9 +2855,9 @@ dependencies = [
[[package]]
name = "http"
version = "1.1.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
dependencies = [
"bytes",
"fnv",
@@ -2860,7 +2882,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http 1.1.0",
"http 1.3.1",
]
[[package]]
@@ -2871,7 +2893,7 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
dependencies = [
"bytes",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"pin-project-lite",
]
@@ -2995,7 +3017,7 @@ dependencies = [
"futures-channel",
"futures-util",
"h2 0.4.4",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"httparse",
"httpdate",
@@ -3028,7 +3050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"http 1.3.1",
"hyper 1.4.1",
"hyper-util",
"rustls 0.22.4",
@@ -3060,7 +3082,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"hyper 1.4.1",
"pin-project-lite",
@@ -3709,7 +3731,7 @@ version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -3770,8 +3792,8 @@ dependencies = [
"once_cell",
"procfs",
"prometheus",
"rand 0.8.5",
"rand_distr 0.4.3",
"rand 0.9.1",
"rand_distr",
"twox-hash",
]
@@ -3863,7 +3885,7 @@ dependencies = [
"lock_api",
"nix 0.30.1",
"rand 0.9.1",
"rand_distr 0.5.1",
"rand_distr",
"rustc-hash 2.1.1",
"tempfile",
"thiserror 1.0.69",
@@ -4160,7 +4182,7 @@ checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
dependencies = [
"async-trait",
"bytes",
"http 1.1.0",
"http 1.3.1",
"opentelemetry",
"reqwest",
]
@@ -4173,7 +4195,7 @@ checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
dependencies = [
"async-trait",
"futures-core",
"http 1.1.0",
"http 1.3.1",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
@@ -4252,6 +4274,30 @@ dependencies = [
"winapi",
]
[[package]]
name = "ouroboros"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0f050db9c44b97a94723127e6be766ac5c340c48f2c4bb3ffa11713744be59"
dependencies = [
"aliasable",
"ouroboros_macro",
"static_assertions",
]
[[package]]
name = "ouroboros_macro"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7028bdd3d43083f6d8d4d5187680d0d3560d54df4cc9d752005268b41e64d0"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.100",
]
[[package]]
name = "outref"
version = "0.5.1"
@@ -4315,7 +4361,7 @@ dependencies = [
"pageserver_client_grpc",
"pageserver_page_api",
"pprof",
"rand 0.8.5",
"rand 0.9.1",
"reqwest",
"serde",
"serde_json",
@@ -4381,7 +4427,7 @@ dependencies = [
"hashlink",
"hex",
"hex-literal",
"http 1.1.0",
"http 1.3.1",
"http-utils",
"humantime",
"humantime-serde",
@@ -4412,7 +4458,7 @@ dependencies = [
"pprof",
"pq_proto",
"procfs",
"rand 0.8.5",
"rand 0.9.1",
"range-set-blaze",
"regex",
"remote_storage",
@@ -4479,7 +4525,7 @@ dependencies = [
"postgres_ffi_types",
"postgres_versioninfo",
"posthog_client_lite",
"rand 0.8.5",
"rand 0.9.1",
"remote_storage",
"reqwest",
"serde",
@@ -4549,7 +4595,7 @@ dependencies = [
"once_cell",
"pageserver_api",
"pin-project-lite",
"rand 0.8.5",
"rand 0.9.1",
"svg_fmt",
"tokio",
"tracing",
@@ -4922,7 +4968,7 @@ dependencies = [
"fallible-iterator",
"hmac",
"memchr",
"rand 0.8.5",
"rand 0.9.1",
"sha2",
"stringprep",
"tokio",
@@ -5114,7 +5160,7 @@ dependencies = [
"bytes",
"itertools 0.10.5",
"postgres-protocol",
"rand 0.8.5",
"rand 0.9.1",
"serde",
"thiserror 1.0.69",
"tokio",
@@ -5148,6 +5194,19 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proc-macro2-diagnostics"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
"version_check",
"yansi",
]
[[package]]
name = "procfs"
version = "0.16.0"
@@ -5217,7 +5276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
@@ -5238,7 +5297,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
@@ -5334,7 +5393,7 @@ dependencies = [
"hex",
"hmac",
"hostname",
"http 1.1.0",
"http 1.3.1",
"http-body-util",
"http-utils",
"humantime",
@@ -5354,6 +5413,7 @@ dependencies = [
"metrics",
"once_cell",
"opentelemetry",
"ouroboros",
"p256 0.13.2",
"papaya",
"parking_lot 0.12.1",
@@ -5364,8 +5424,9 @@ dependencies = [
"postgres-protocol2",
"postgres_backend",
"pq_proto",
"rand 0.8.5",
"rand_distr 0.4.3",
"rand 0.9.1",
"rand_core 0.6.4",
"rand_distr",
"rcgen",
"redis",
"regex",
@@ -5390,6 +5451,7 @@ dependencies = [
"socket2",
"strum_macros",
"subtle",
"subzero-core",
"thiserror 1.0.69",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
@@ -5566,16 +5628,6 @@ dependencies = [
"getrandom 0.3.3",
]
[[package]]
name = "rand_distr"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
dependencies = [
"num-traits",
"rand 0.8.5",
]
[[package]]
name = "rand_distr"
version = "0.5.1"
@@ -5705,14 +5757,14 @@ dependencies = [
[[package]]
name = "regex"
version = "1.10.2"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
@@ -5726,13 +5778,13 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.3"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.2",
"regex-syntax 0.8.5",
]
[[package]]
@@ -5749,9 +5801,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.2"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "relative-path"
@@ -5789,7 +5841,7 @@ dependencies = [
"metrics",
"once_cell",
"pin-project-lite",
"rand 0.8.5",
"rand 0.9.1",
"reqwest",
"scopeguard",
"serde",
@@ -5821,7 +5873,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
@@ -5863,7 +5915,7 @@ checksum = "d1ccd3b55e711f91a9885a2fa6fbbb2e39db1776420b062efc058c6410f7e5e3"
dependencies = [
"anyhow",
"async-trait",
"http 1.1.0",
"http 1.3.1",
"reqwest",
"serde",
"thiserror 1.0.69",
@@ -5880,7 +5932,7 @@ dependencies = [
"async-trait",
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"http 1.3.1",
"hyper 1.4.1",
"parking_lot 0.11.2",
"reqwest",
@@ -5901,7 +5953,7 @@ dependencies = [
"anyhow",
"async-trait",
"getrandom 0.2.11",
"http 1.1.0",
"http 1.3.1",
"matchit",
"opentelemetry",
"reqwest",
@@ -6260,7 +6312,7 @@ dependencies = [
"fail",
"futures",
"hex",
"http 1.1.0",
"http 1.3.1",
"http-utils",
"humantime",
"hyper 0.14.30",
@@ -6279,7 +6331,7 @@ dependencies = [
"postgres_versioninfo",
"pprof",
"pq_proto",
"rand 0.8.5",
"rand 0.9.1",
"regex",
"remote_storage",
"reqwest",
@@ -6973,7 +7025,7 @@ dependencies = [
"pageserver_client",
"postgres_connection",
"posthog_client_lite",
"rand 0.8.5",
"rand 0.9.1",
"regex",
"reqwest",
"routerify",
@@ -7109,7 +7161,7 @@ version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
@@ -7122,6 +7174,10 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "subzero-core"
version = "3.0.1"
[[package]]
name = "svg_fmt"
version = "0.4.3"
@@ -7732,7 +7788,7 @@ dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"percent-encoding",
@@ -7756,7 +7812,7 @@ dependencies = [
"bytes",
"flate2",
"h2 0.4.4",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
@@ -7847,7 +7903,7 @@ dependencies = [
"base64 0.22.1",
"bitflags 2.8.0",
"bytes",
"http 1.1.0",
"http 1.3.1",
"http-body 1.0.0",
"mime",
"pin-project-lite",
@@ -7868,7 +7924,7 @@ name = "tower-otel"
version = "0.2.0"
source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd"
dependencies = [
"http 1.1.0",
"http 1.3.1",
"opentelemetry",
"pin-project",
"tower-layer",
@@ -8049,7 +8105,7 @@ dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"http 1.3.1",
"httparse",
"log",
"rand 0.8.5",
@@ -8068,7 +8124,7 @@ dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"http 1.3.1",
"httparse",
"log",
"rand 0.8.5",
@@ -8250,7 +8306,7 @@ dependencies = [
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"rand 0.9.1",
"regex",
"scopeguard",
"sentry",
@@ -8857,8 +8913,8 @@ dependencies = [
"quote",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
"reqwest",
"rustls 0.23.27",
"rustls-pki-types",
@@ -8954,6 +9010,12 @@ version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd"
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yasna"
version = "0.5.2"

View File

@@ -49,6 +49,7 @@ members = [
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"pgxn/neon/communicator",
"proxy/subzero_core",
]
[workspace.package]
@@ -157,7 +158,9 @@ procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.8"
rand = "0.9"
# Remove after p256 is updated to 0.14.
rand_core = "=0.6"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }

View File

@@ -63,7 +63,14 @@ WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero" && \
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a; \
fi \
&& cargo chef prepare --recipe-path recipe.json
# Main build image
FROM $REPOSITORY/$IMAGE:$TAG AS build
@@ -71,20 +78,33 @@ WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG ADDITIONAL_RUSTFLAGS=""
ENV CARGO_FEATURES="default"
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
# `recipe.json`, so the layer can be reused as long as none of the dependencies change.
COPY --from=plan /home/nonroot/recipe.json recipe.json
RUN set -e \
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"; \
fi \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
# Perform the main build. We reuse the Postgres build artifacts from the intermediate 'pg-build'
# layer, and the cargo dependencies built in the previous step.
COPY --chown=nonroot --from=pg-build /home/nonroot/pg_install/ pg_install
COPY --chown=nonroot . .
COPY --chown=nonroot --from=plan /home/nonroot/proxy/Cargo.toml proxy/Cargo.toml
COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Cargo.lock
RUN set -e \
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_FEATURES="rest_broker"; \
fi \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--features $CARGO_FEATURES \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -215,6 +215,10 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
-C $(BUILD_DIR)/pgxn-v17/neon \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent
# Check whether pxgn/neon code is compliant with pgindent.
.PHONY: pgindent
neon-pgindent-check:
$(MAKE) PGINDENT_FLAGS=--silent-diff neon-pgindent
.PHONY: setup-pre-commit-hook
setup-pre-commit-hook:

View File

@@ -27,7 +27,10 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hostname-validator = "1.1"
hyper.workspace = true
hyper-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
@@ -44,6 +47,7 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
scopeguard.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,98 @@
//! Client for making request to a running Postgres server's communicator control socket.
//!
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
use std::path::Path;
use anyhow::Context;
use hyper::client::conn::http1::SendRequest;
use hyper_util::rt::TokioIo;
/// Name of the socket within the Postgres data directory. This better match that in
/// `pgxn/neon/communicator/src/lib.rs`.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
/// Open a connection to the communicator's control socket, prepare to send requests to it
/// with hyper.
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
let socket_path_len = socket_path.display().to_string().len();
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
// Unix Domain socket. The limit is on the connect(2) function used to open the
// socket, not on the absolute path itself. Postgres changes the current directory to
// the data directory and uses a relative path to bind to the socket, and the relative
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
// open the socket, we need to use a full path, which can be arbitrarily long.
//
// There are a few ways we could work around this:
//
// 1. Change the current directory to the Postgres data directory and use a relative
// path in the connect(2) call. That's problematic because the current directory
// applies to the whole process. We could change the current directory early in
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
// it would be more robust if the data directory is moved around or unlinked for
// some reason, and you would be less likely to accidentally litter other parts of
// the filesystem with e.g. temporary files. However, that's a pretty invasive
// change.
//
// 2. On Linux, you could open() the data directory, and refer to the the socket
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
// Linux-only.
//
// 3. Create a symbolic link to the socket with a shorter path, and use that.
//
// We use the symbolic link approach here. Hopefully the paths we use in production
// are shorter, so that we can open the socket directly, so that this hack is needed
// only in development.
let connect_result = if socket_path_len < 100 {
// We can open the path directly with no hacks.
tokio::net::UnixStream::connect(socket_path).await
} else {
// The path to the socket is too long. Create a symlink to it with a shorter path.
let short_path = std::env::temp_dir().join(format!(
"compute_ctl.short-socket.{}.{}",
std::process::id(),
tokio::task::id()
));
std::os::unix::fs::symlink(&socket_path, &short_path)?;
// Delete the symlink as soon as we have connected to it. There's a small chance
// of leaking if the process dies before we remove it, so try to keep that window
// as small as possible.
scopeguard::defer! {
if let Err(err) = std::fs::remove_file(&short_path) {
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
short_path.display(), err);
}
}
tracing::info!(
"created symlink \"{}\" for socket \"{}\", opening it now",
short_path.display(),
socket_path.display()
);
tokio::net::UnixStream::connect(&short_path).await
};
let stream = connect_result.context("connecting to communicator control socket")?;
let io = TokioIo::new(stream);
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(err) = connection.await {
eprintln!("Error in connection: {err}");
}
});
Ok(request_sender)
}

View File

@@ -1,10 +1,18 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::communicator_socket_client::connect_communicator_socket;
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::metrics::collect;
@@ -31,3 +39,42 @@ pub(in crate::http) async fn get_metrics() -> Response {
.body(Body::from(buffer))
.unwrap()
}
/// Fetch and forward metrics from the Postgres neon extension's metrics
/// exporter that are used by autoscaling-agent.
///
/// The neon extension exposes these metrics over a Unix domain socket
/// in the data directory. That's not accessible directly from the outside
/// world, so we have this endpoint in compute_ctl to expose it
pub(in crate::http) async fn get_autoscaling_metrics(
State(compute): State<Arc<ComputeNode>>,
) -> Result<Response, Response> {
let pgdata = Path::new(&compute.params.pgdata);
// Connect to the communicator process's metrics socket
let mut metrics_client = connect_communicator_socket(pgdata)
.await
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Make a request for /autoscaling_metrics
let request = Request::builder()
.method("GET")
.uri("/autoscaling_metrics")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(Body::from(""))
.unwrap();
let resp = metrics_client
.send_request(request)
.await
.context("fetching metrics from Postgres metrics service")
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Build a response that just forwards the response we got.
let mut response = Response::builder();
response = response.status(resp.status());
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
response = response.header(CONTENT_TYPE, content_type);
}
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
Ok(response.body(body).unwrap())
}

View File

@@ -81,8 +81,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Server::External {
config, compute_id, ..
} => {
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/autoscaling_metrics",
get(metrics::get_autoscaling_metrics),
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))

View File

@@ -4,6 +4,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checker;
pub mod communicator_socket_client;
pub mod config;
pub mod configurator;
pub mod http;

View File

@@ -411,7 +411,8 @@ impl ComputeNode {
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
30..300 => limit / 3,
300.. => 100,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)

View File

@@ -8,10 +8,10 @@ code changes locally, but not suitable for running production systems.
## Example: Start with Postgres 16
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 3 of the start-up commands.
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 2 of the start-up commands.
```shell
cargo neon init --pg-version 16
cargo neon init
cargo neon start
cargo neon tenant create --set-default --pg-version 16
cargo neon endpoint create main --pg-version 16

View File

@@ -407,6 +407,12 @@ struct StorageControllerStartCmdArgs {
help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
)]
base_port: Option<u16>,
#[clap(
long,
help = "Whether the storage controller should handle pageserver-reported local disk loss events."
)]
handle_ps_local_disk_loss: Option<bool>,
}
#[derive(clap::Args)]
@@ -1809,6 +1815,7 @@ async fn handle_storage_controller(
instance_id: args.instance_id,
base_port: args.base_port,
start_timeout: args.start_timeout,
handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
};
if let Err(e) = svc.start(start_args).await {

View File

@@ -65,7 +65,6 @@ use jsonwebtoken::jwk::{
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
};
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use pem::Pem;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::PgMajorVersion;
@@ -77,6 +76,7 @@ use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::shard::ShardStripeSize;
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;

View File

@@ -56,6 +56,7 @@ pub struct NeonStorageControllerStartArgs {
pub instance_id: u8,
pub base_port: Option<u16>,
pub start_timeout: humantime::Duration,
pub handle_ps_local_disk_loss: Option<bool>,
}
impl NeonStorageControllerStartArgs {
@@ -64,6 +65,7 @@ impl NeonStorageControllerStartArgs {
instance_id: 1,
base_port: None,
start_timeout,
handle_ps_local_disk_loss: None,
}
}
}
@@ -669,6 +671,10 @@ impl StorageController {
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
if start_args.handle_ps_local_disk_loss.unwrap_or_default() {
args.push("--handle-ps-local-disk-loss".to_string());
}
background_process::start_process(
COMMAND,
&instance_dir,

View File

@@ -35,6 +35,7 @@ reason = "The paste crate is a build-only dependency with no runtime components.
# More documentation for the licenses section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
version = 2
allow = [
"0BSD",
"Apache-2.0",

View File

@@ -233,7 +233,7 @@ mod tests {
.unwrap()
.as_millis();
use rand::Rng;
let random = rand::thread_rng().r#gen::<u32>();
let random = rand::rng().random::<u32>();
let s3_config = remote_storage::S3Config {
bucket_name: var(REAL_S3_BUCKET).unwrap(),

View File

@@ -90,7 +90,7 @@ impl<'a> IdempotencyKey<'a> {
IdempotencyKey {
now: Utc::now(),
node_id,
nonce: rand::thread_rng().gen_range(0..=9999),
nonce: rand::rng().random_range(0..=9999),
}
}

View File

@@ -41,7 +41,7 @@ impl NodeOs {
/// Generate a random number in range [0, max).
pub fn random(&self, max: u64) -> u64 {
self.internal.rng.lock().gen_range(0..max)
self.internal.rng.lock().random_range(0..max)
}
/// Append a new event to the world event log.

View File

@@ -32,10 +32,10 @@ impl Delay {
/// Generate a random delay in range [min, max]. Return None if the
/// message should be dropped.
pub fn delay(&self, rng: &mut StdRng) -> Option<u64> {
if rng.gen_bool(self.fail_prob) {
if rng.random_bool(self.fail_prob) {
return None;
}
Some(rng.gen_range(self.min..=self.max))
Some(rng.random_range(self.min..=self.max))
}
}

View File

@@ -69,7 +69,7 @@ impl World {
/// Create a new random number generator.
pub fn new_rng(&self) -> StdRng {
let mut rng = self.rng.lock();
StdRng::from_rng(rng.deref_mut()).unwrap()
StdRng::from_rng(rng.deref_mut())
}
/// Create a new node.

View File

@@ -17,5 +17,5 @@ procfs.workspace = true
measured-process.workspace = true
[dev-dependencies]
rand = "0.8"
rand_distr = "0.4.3"
rand.workspace = true
rand_distr = "0.5"

View File

@@ -260,7 +260,7 @@ mod tests {
#[test]
fn test_cardinality_small() {
let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
let (actual, estimate) = test_cardinality(100, Zipf::new(100.0, 1.2f64).unwrap());
assert_eq!(actual, [46, 30, 32]);
assert!(51.3 < estimate[0] && estimate[0] < 51.4);
@@ -270,7 +270,7 @@ mod tests {
#[test]
fn test_cardinality_medium() {
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000.0, 1.2f64).unwrap());
assert_eq!(actual, [2529, 1618, 1629]);
assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
@@ -280,7 +280,8 @@ mod tests {
#[test]
fn test_cardinality_large() {
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
let (actual, estimate) =
test_cardinality(1_000_000, Zipf::new(1_000_000.0, 1.2f64).unwrap());
assert_eq!(actual, [129077, 79579, 79630]);
assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
@@ -290,7 +291,7 @@ mod tests {
#[test]
fn test_cardinality_small2() {
let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
let (actual, estimate) = test_cardinality(100, Zipf::new(200.0, 0.8f64).unwrap());
assert_eq!(actual, [92, 58, 60]);
assert!(116.1 < estimate[0] && estimate[0] < 116.2);
@@ -300,7 +301,7 @@ mod tests {
#[test]
fn test_cardinality_medium2() {
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000.0, 0.8f64).unwrap());
assert_eq!(actual, [8201, 5131, 5051]);
assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
@@ -310,7 +311,8 @@ mod tests {
#[test]
fn test_cardinality_large2() {
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
let (actual, estimate) =
test_cardinality(1_000_000, Zipf::new(2_000_000.0, 0.8f64).unwrap());
assert_eq!(actual, [777847, 482069, 482246]);
assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);

View File

@@ -16,5 +16,5 @@ rustc-hash.workspace = true
tempfile = "3.14.0"
[dev-dependencies]
rand = "0.9"
rand.workspace = true
rand_distr = "0.5.1"

View File

@@ -596,6 +596,7 @@ pub struct TimelineImportRequest {
pub timeline_id: TimelineId,
pub start_lsn: Lsn,
pub sk_set: Vec<NodeId>,
pub force_upsert: bool,
}
#[derive(serde::Serialize, serde::Deserialize, Clone)]

View File

@@ -981,12 +981,12 @@ mod tests {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let key = Key {
field1: rng.r#gen(),
field2: rng.r#gen(),
field3: rng.r#gen(),
field4: rng.r#gen(),
field5: rng.r#gen(),
field6: rng.r#gen(),
field1: rng.random(),
field2: rng.random(),
field3: rng.random(),
field4: rng.random(),
field5: rng.random(),
field6: rng.random(),
};
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());

View File

@@ -443,9 +443,9 @@ pub struct ImportPgdataIdempotencyKey(pub String);
impl ImportPgdataIdempotencyKey {
pub fn random() -> Self {
use rand::Rng;
use rand::distributions::Alphanumeric;
use rand::distr::Alphanumeric;
Self(
rand::thread_rng()
rand::rng()
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)

View File

@@ -69,22 +69,6 @@ impl Hash for ShardIdentity {
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
impl Default for ShardStripeSize {
fn default() -> Self {
DEFAULT_STRIPE_SIZE
}
}
impl std::fmt::Display for ShardStripeSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);

View File

@@ -21,6 +21,14 @@ pub struct ReAttachRequest {
/// if the node already has a node_id set.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub register: Option<NodeRegisterRequest>,
/// Hadron: Optional flag to indicate whether the node is starting with an empty local disk.
/// Will be set to true if the node couldn't find any local tenant data on startup, could be
/// due to the node starting for the first time or due to a local SSD failure/disk wipe event.
/// The flag may be used by the storage controller to update its observed state of the world
/// to make sure that it sends explicit location_config calls to the node following the
/// re-attach request.
pub empty_local_disk: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -203,12 +203,12 @@ impl fmt::Display for CancelKeyData {
}
}
use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
use rand::distr::{Distribution, StandardUniform};
impl Distribution<CancelKeyData> for StandardUniform {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.r#gen(),
cancel_key: rng.r#gen(),
backend_pid: rng.random(),
cancel_key: rng.random(),
}
}
}

View File

@@ -155,10 +155,10 @@ pub struct ScramSha256 {
fn nonce() -> String {
// rand 0.5's ThreadRng is cryptographically secure
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
(0..NONCE_LENGTH)
.map(|_| {
let mut v = rng.gen_range(0x21u8..0x7e);
let mut v = rng.random_range(0x21u8..0x7e);
if v == 0x2c {
v = 0x7e
}

View File

@@ -74,7 +74,6 @@ impl Header {
}
/// An enum representing Postgres backend messages.
#[non_exhaustive]
pub enum Message {
AuthenticationCleartextPassword,
AuthenticationGss,
@@ -145,16 +144,7 @@ impl Message {
PARSE_COMPLETE_TAG => Message::ParseComplete,
BIND_COMPLETE_TAG => Message::BindComplete,
CLOSE_COMPLETE_TAG => Message::CloseComplete,
NOTIFICATION_RESPONSE_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
let channel = buf.read_cstr()?;
let message = buf.read_cstr()?;
Message::NotificationResponse(NotificationResponseBody {
process_id,
channel,
message,
})
}
NOTIFICATION_RESPONSE_TAG => Message::NotificationResponse(NotificationResponseBody {}),
COPY_DONE_TAG => Message::CopyDone,
COMMAND_COMPLETE_TAG => {
let tag = buf.read_cstr()?;
@@ -543,28 +533,7 @@ impl NoticeResponseBody {
}
}
pub struct NotificationResponseBody {
process_id: i32,
channel: Bytes,
message: Bytes,
}
impl NotificationResponseBody {
#[inline]
pub fn process_id(&self) -> i32 {
self.process_id
}
#[inline]
pub fn channel(&self) -> io::Result<&str> {
get_str(&self.channel)
}
#[inline]
pub fn message(&self) -> io::Result<&str> {
get_str(&self.message)
}
}
pub struct NotificationResponseBody {}
pub struct ParameterDescriptionBody {
storage: Bytes,

View File

@@ -28,7 +28,7 @@ const SCRAM_DEFAULT_SALT_LEN: usize = 16;
/// special characters that would require escaping in an SQL command.
pub async fn scram_sha_256(password: &[u8]) -> String {
let mut salt: [u8; SCRAM_DEFAULT_SALT_LEN] = [0; SCRAM_DEFAULT_SALT_LEN];
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
rng.fill_bytes(&mut salt);
scram_sha_256_salt(password, salt).await
}

View File

@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::cancel_token::RawCancelToken;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::codec::{BackendMessages, FrontendMessage, RecordNotices};
use crate::config::{Host, SslMode};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
@@ -221,6 +221,18 @@ impl Client {
&mut self.inner
}
pub fn record_notices(&mut self, limit: usize) -> mpsc::UnboundedReceiver<Box<str>> {
let (tx, rx) = mpsc::unbounded_channel();
let notices = RecordNotices { sender: tx, limit };
self.inner
.sender
.send(FrontendMessage::RecordNotices(notices))
.ok();
rx
}
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
/// to save a roundtrip
pub async fn query_raw_txt<S, I>(

View File

@@ -3,10 +3,17 @@ use std::io;
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {
Raw(Bytes),
RecordNotices(RecordNotices),
}
pub struct RecordNotices {
pub sender: UnboundedSender<Box<str>>,
pub limit: usize,
}
pub enum BackendMessage {
@@ -33,14 +40,11 @@ impl FallibleIterator for BackendMessages {
pub struct PostgresCodec;
impl Encoder<FrontendMessage> for PostgresCodec {
impl Encoder<Bytes> for PostgresCodec {
type Error = io::Error;
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
match item {
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
}
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> io::Result<()> {
dst.extend_from_slice(&item);
Ok(())
}
}

View File

@@ -1,11 +1,9 @@
use std::net::IpAddr;
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::client::SocketConfig;
use crate::codec::BackendMessage;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
@@ -48,8 +46,8 @@ where
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
let RawConnection {
stream,
parameters,
delayed_notice,
parameters: _,
delayed_notice: _,
process_id,
secret_key,
} = connect_raw(stream, config).await?;
@@ -72,13 +70,7 @@ where
secret_key,
);
// delayed notices are always sent as "Async" messages.
let delayed = delayed_notice
.into_iter()
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
let connection = Connection::new(stream, conn_tx, conn_rx);
Ok((client, connection))
}

View File

@@ -3,7 +3,7 @@ use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
use postgres_protocol2::authentication::sasl;
@@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::codec::{BackendMessage, BackendMessages, PostgresCodec};
use crate::config::{self, AuthKeys, Config};
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::TlsStream;
@@ -25,7 +25,7 @@ pub struct StartupStream<S, T> {
delayed_notice: Vec<NoticeResponseBody>,
}
impl<S, T> Sink<FrontendMessage> for StartupStream<S, T>
impl<S, T> Sink<Bytes> for StartupStream<S, T>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
@@ -36,7 +36,7 @@ where
Pin::new(&mut self.inner).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: FrontendMessage) -> io::Result<()> {
fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> io::Result<()> {
Pin::new(&mut self.inner).start_send(item)
}
@@ -120,10 +120,7 @@ where
let mut buf = BytesMut::new();
frontend::startup_message(&config.server_params, &mut buf).map_err(Error::encode)?;
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await
.map_err(Error::io)
stream.send(buf.freeze()).await.map_err(Error::io)
}
async fn authenticate<S, T>(stream: &mut StartupStream<S, T>, config: &Config) -> Result<(), Error>
@@ -191,10 +188,7 @@ where
let mut buf = BytesMut::new();
frontend::password_message(password, &mut buf).map_err(Error::encode)?;
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await
.map_err(Error::io)
stream.send(buf.freeze()).await.map_err(Error::io)
}
async fn authenticate_sasl<S, T>(
@@ -253,10 +247,7 @@ where
let mut buf = BytesMut::new();
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf).map_err(Error::encode)?;
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await
.map_err(Error::io)?;
stream.send(buf.freeze()).await.map_err(Error::io)?;
let body = match stream.try_next().await.map_err(Error::io)? {
Some(Message::AuthenticationSaslContinue(body)) => body,
@@ -272,10 +263,7 @@ where
let mut buf = BytesMut::new();
frontend::sasl_response(scram.message(), &mut buf).map_err(Error::encode)?;
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await
.map_err(Error::io)?;
stream.send(buf.freeze()).await.map_err(Error::io)?;
let body = match stream.try_next().await.map_err(Error::io)? {
Some(Message::AuthenticationSaslFinal(body)) => body,

View File

@@ -1,22 +1,23 @@
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use futures_util::{Sink, Stream, ready};
use postgres_protocol2::message::backend::Message;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, StreamExt, ready};
use postgres_protocol2::message::backend::{Message, NoticeResponseBody};
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tokio_util::sync::PollSender;
use tracing::{info, trace};
use tracing::trace;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;
use crate::Error;
use crate::codec::{
BackendMessage, BackendMessages, FrontendMessage, PostgresCodec, RecordNotices,
};
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
#[derive(PartialEq, Debug)]
enum State {
@@ -33,18 +34,18 @@ enum State {
/// occurred, or because its associated `Client` has dropped and all outstanding work has completed.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<S, T> {
/// HACK: we need this in the Neon Proxy.
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
/// HACK: we need this in the Neon Proxy to forward params.
pub parameters: HashMap<String, String>,
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
sender: PollSender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
notices: Option<RecordNotices>,
pending_responses: VecDeque<BackendMessage>,
pending_response: Option<BackendMessages>,
state: State,
}
pub enum Never {}
impl<S, T> Connection<S, T>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -52,70 +53,42 @@ where
{
pub(crate) fn new(
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
pending_responses: VecDeque<BackendMessage>,
parameters: HashMap<String, String>,
sender: mpsc::Sender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
) -> Connection<S, T> {
Connection {
stream,
parameters,
sender: PollSender::new(sender),
receiver,
pending_responses,
notices: None,
pending_response: None,
state: State::Active,
}
}
fn poll_response(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BackendMessage, Error>>> {
if let Some(message) = self.pending_responses.pop_front() {
trace!("retrying pending response");
return Poll::Ready(Some(Ok(message)));
}
Pin::new(&mut self.stream)
.poll_next(cx)
.map(|o| o.map(|r| r.map_err(Error::io)))
}
/// Read and process messages from the connection to postgres.
/// client <- postgres
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<AsyncMessage, Error>> {
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<Never, Error>> {
loop {
let message = match self.poll_response(cx)? {
Poll::Ready(Some(message)) => message,
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
Poll::Pending => {
trace!("poll_read: waiting on response");
return Poll::Pending;
}
};
let messages = match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
}
BackendMessage::Async(Message::NotificationResponse(body)) => {
let notification = Notification {
process_id: body.process_id(),
channel: body.channel().map_err(Error::parse)?.to_string(),
payload: body.message().map_err(Error::parse)?.to_string(),
let messages = match self.pending_response.take() {
Some(messages) => messages,
None => {
let message = match self.stream.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(Error::io(e))),
Poll::Ready(Some(Ok(message))) => message,
};
return Poll::Ready(Ok(AsyncMessage::Notification(notification)));
match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
self.handle_notice(body)?;
continue;
}
BackendMessage::Async(_) => continue,
BackendMessage::Normal { messages } => messages,
}
}
BackendMessage::Async(Message::ParameterStatus(body)) => {
self.parameters.insert(
body.name().map_err(Error::parse)?.to_string(),
body.value().map_err(Error::parse)?.to_string(),
);
continue;
}
BackendMessage::Async(_) => unreachable!(),
BackendMessage::Normal { messages } => messages,
};
match self.sender.poll_reserve(cx) {
@@ -126,8 +99,7 @@ where
return Poll::Ready(Err(Error::closed()));
}
Poll::Pending => {
self.pending_responses
.push_back(BackendMessage::Normal { messages });
self.pending_response = Some(messages);
trace!("poll_read: waiting on sender");
return Poll::Pending;
}
@@ -135,6 +107,31 @@ where
}
}
fn handle_notice(&mut self, body: NoticeResponseBody) -> Result<(), Error> {
let Some(notices) = &mut self.notices else {
return Ok(());
};
let mut fields = body.fields();
while let Some(field) = fields.next().map_err(Error::parse)? {
// loop until we find the message field
if field.type_() == b'M' {
// if the message field is within the limit, send it.
if let Some(new_limit) = notices.limit.checked_sub(field.value().len()) {
match notices.sender.send(field.value().into()) {
// set the new limit.
Ok(()) => notices.limit = new_limit,
// closed.
Err(_) => self.notices = None,
}
}
break;
}
}
Ok(())
}
/// Fetch the next client request and enqueue the response sender.
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
if self.receiver.is_closed() {
@@ -168,21 +165,23 @@ where
match self.poll_request(cx) {
// send the message to postgres
Poll::Ready(Some(request)) => {
Poll::Ready(Some(FrontendMessage::Raw(request))) => {
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
}
Poll::Ready(Some(FrontendMessage::RecordNotices(notices))) => {
self.notices = Some(notices)
}
// No more messages from the client, and no more responses to wait for.
// Send a terminate message to postgres
Poll::Ready(None) => {
trace!("poll_write: at eof, terminating");
let mut request = BytesMut::new();
frontend::terminate(&mut request);
let request = FrontendMessage::Raw(request.freeze());
Pin::new(&mut self.stream)
.start_send(request)
.start_send(request.freeze())
.map_err(Error::io)?;
trace!("poll_write: sent eof, closing");
@@ -231,34 +230,17 @@ where
}
}
/// Returns the value of a runtime parameter for this connection.
pub fn parameter(&self, name: &str) -> Option<&str> {
self.parameters.get(name).map(|s| &**s)
}
/// Polls for asynchronous messages from the server.
///
/// The server can send notices as well as notifications asynchronously to the client. Applications that wish to
/// examine those messages should use this method to drive the connection rather than its `Future` implementation.
pub fn poll_message(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<AsyncMessage, Error>>> {
fn poll_message(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Never, Error>>> {
if self.state != State::Closing {
// if the state is still active, try read from and write to postgres.
let message = self.poll_read(cx)?;
let closing = self.poll_write(cx)?;
if let Poll::Ready(()) = closing {
let Poll::Pending = self.poll_read(cx)?;
if self.poll_write(cx)?.is_ready() {
self.state = State::Closing;
}
if let Poll::Ready(message) = message {
return Poll::Ready(Some(Ok(message)));
}
// poll_read returned Pending.
// poll_write returned Pending or Ready(WriteReady::WaitingOnRead).
// if poll_write returned Ready(WriteReady::WaitingOnRead), then we are waiting to read more data from postgres.
// poll_write returned Pending or Ready(()).
// if poll_write returned Ready(()), then we are waiting to read more data from postgres.
if self.state != State::Closing {
return Poll::Pending;
}
@@ -280,11 +262,9 @@ where
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
while let Some(message) = ready!(self.poll_message(cx)?) {
if let AsyncMessage::Notice(notice) = message {
info!("{}: {}", notice.severity(), notice.message());
}
match self.poll_message(cx)? {
Poll::Ready(None) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
Poll::Ready(Ok(()))
}
}

View File

@@ -8,7 +8,6 @@ pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
pub use crate::connect_raw::RawConnection;
pub use crate::connection::Connection;
use crate::error::DbError;
pub use crate::error::Error;
pub use crate::generic_client::GenericClient;
pub use crate::query::RowStream;
@@ -93,21 +92,6 @@ impl Notification {
}
}
/// An asynchronous message from the server.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum AsyncMessage {
/// A notice.
///
/// Notices use the same format as errors, but aren't "errors" per-se.
Notice(DbError),
/// A notification.
///
/// Connections can subscribe to notifications with the `LISTEN` command.
Notification(Notification),
}
/// Message returned by the `SimpleQuery` stream.
#[derive(Debug)]
#[non_exhaustive]

View File

@@ -43,7 +43,7 @@ itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
byteorder = "1.4"
rand = "0.8.5"
rand.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true

View File

@@ -81,7 +81,7 @@ impl UnreliableWrapper {
///
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
let mut attempts = self.attempts.lock().unwrap();
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
match attempts.entry(op) {
Entry::Occupied(mut e) => {
@@ -94,7 +94,7 @@ impl UnreliableWrapper {
/* BEGIN_HADRON */
// If there are more attempts to fail, fail the request by probability.
if (attempts_before_this < self.attempts_to_fail)
&& (rng.gen_range(0..=100) < self.attempt_failure_probability)
&& (rng.random_range(0..=100) < self.attempt_failure_probability)
{
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());

View File

@@ -208,7 +208,7 @@ async fn create_azure_client(
.as_millis();
// because nanos can be the same for two threads so can millis, add randomness
let random = rand::thread_rng().r#gen::<u32>();
let random = rand::rng().random::<u32>();
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {

View File

@@ -385,7 +385,7 @@ async fn create_s3_client(
.as_millis();
// because nanos can be the same for two threads so can millis, add randomness
let random = rand::thread_rng().r#gen::<u32>();
let random = rand::rng().random::<u32>();
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {

View File

@@ -104,7 +104,7 @@ impl Id {
pub fn generate() -> Self {
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
rand::rng().fill(&mut tli_buf);
Id::from(tli_buf)
}

View File

@@ -364,42 +364,37 @@ impl MonotonicCounter<Lsn> for RecordLsn {
}
}
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
/// Implements [`rand::distr::uniform::UniformSampler`] so we can sample [`Lsn`]s.
///
/// This is used by the `pagebench` pageserver benchmarking tool.
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
pub struct LsnSampler(<u64 as rand::distr::uniform::SampleUniform>::Sampler);
impl rand::distributions::uniform::SampleUniform for Lsn {
impl rand::distr::uniform::SampleUniform for Lsn {
type Sampler = LsnSampler;
}
impl rand::distributions::uniform::UniformSampler for LsnSampler {
impl rand::distr::uniform::UniformSampler for LsnSampler {
type X = Lsn;
fn new<B1, B2>(low: B1, high: B2) -> Self
fn new<B1, B2>(low: B1, high: B2) -> Result<Self, rand::distr::uniform::Error>
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B1: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
low.borrow().0,
high.borrow().0,
),
)
<u64 as rand::distr::uniform::SampleUniform>::Sampler::new(low.borrow().0, high.borrow().0)
.map(Self)
}
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Result<Self, rand::distr::uniform::Error>
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B1: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
low.borrow().0,
high.borrow().0,
),
<u64 as rand::distr::uniform::SampleUniform>::Sampler::new_inclusive(
low.borrow().0,
high.borrow().0,
)
.map(Self)
}
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {

View File

@@ -25,6 +25,12 @@ pub struct ShardIndex {
pub shard_count: ShardCount,
}
/// Stripe size as number of pages.
///
/// NB: don't implement Default, so callers don't lazily use it by mistake. See DEFAULT_STRIPE_SIZE.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
/// Formatting helper, for generating the `shard_id` label in traces.
pub struct ShardSlug<'a>(&'a TenantShardId);
@@ -177,6 +183,12 @@ impl std::fmt::Display for ShardCount {
}
}
impl std::fmt::Display for ShardStripeSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for ShardSlug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -11,7 +11,8 @@ use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{LayerName, PersistentLayerDesc};
use pageserver_api::key::Key;
use pageserver_api::shard::TenantShardId;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::prelude::{SeedableRng, StdRng};
use rand::seq::IndexedRandom;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;

View File

@@ -16,10 +16,9 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}
use crate::retry::Retry;
use crate::split::GetPageSplitter;
use compute_api::spec::PageserverProtocol;
use pageserver_api::shard::ShardStripeSize;
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
@@ -141,8 +140,8 @@ impl PageserverClient {
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
return Err(anyhow!(
"can't change stripe size from {} to {}",
old.stripe_size,
shard_spec.stripe_size
old.stripe_size.expect("always Some when sharded"),
shard_spec.stripe_size.expect("always Some when sharded")
));
}
@@ -157,23 +156,6 @@ impl PageserverClient {
Ok(())
}
/// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists(
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Returns the total size of a database, as # of bytes.
#[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
pub async fn get_db_size(
@@ -249,13 +231,15 @@ impl PageserverClient {
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
.map_err(|err| tonic::Status::internal(err.to_string()))?
{
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mut shard_requests = FuturesUnordered::new();
for (shard_id, shard_req) in splitter.drain_requests() {
@@ -265,10 +249,14 @@ impl PageserverClient {
}
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
splitter
.add_response(shard_id, shard_response)
.map_err(|err| tonic::Status::internal(err.to_string()))?;
}
splitter.get_response()
splitter
.get_response()
.map_err(|err| tonic::Status::internal(err.to_string()))
}
/// Fetches pages on the given shard. Does not retry internally.
@@ -396,12 +384,14 @@ pub struct ShardSpec {
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size for these shards.
stripe_size: ShardStripeSize,
///
/// INVARIANT: None for unsharded tenants, Some for sharded.
stripe_size: Option<ShardStripeSize>,
}
impl ShardSpec {
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
/// The stripe size may be omitted for unsharded tenants.
/// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
pub fn new(
urls: HashMap<ShardIndex, String>,
stripe_size: Option<ShardStripeSize>,
@@ -414,11 +404,13 @@ impl ShardSpec {
n => ShardCount::new(n as u8),
};
// Determine the stripe size. It doesn't matter for unsharded tenants.
// Validate the stripe size.
if stripe_size.is_none() && !count.is_unsharded() {
return Err(anyhow!("stripe size must be given for sharded tenants"));
}
let stripe_size = stripe_size.unwrap_or_default();
if stripe_size.is_some() && count.is_unsharded() {
return Err(anyhow!("stripe size can't be given for unsharded tenants"));
}
// Validate the shard spec.
for (shard_id, url) in &urls {
@@ -458,8 +450,10 @@ struct Shards {
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
/// The stripe size.
///
/// INVARIANT: None for unsharded tenants, Some for sharded.
stripe_size: Option<ShardStripeSize>,
}
impl Shards {

View File

@@ -1,11 +1,12 @@
use std::collections::HashMap;
use anyhow::anyhow;
use bytes::Bytes;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_api::shard::key_to_shard_number;
use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this.
@@ -25,43 +26,54 @@ impl GetPageSplitter {
pub fn for_single_shard(
req: &page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Option<ShardIndex> {
stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<Option<ShardIndex>> {
// Fast path: unsharded tenant.
if count.is_unsharded() {
return Some(ShardIndex::unsharded());
return Ok(Some(ShardIndex::unsharded()));
}
// Find the first page's shard, for comparison. If there are no pages, just return the first
// shard (caller likely checked already, otherwise the server will reject it).
let Some(stripe_size) = stripe_size else {
return Err(anyhow!("stripe size must be given for sharded tenants"));
};
// Find the first page's shard, for comparison.
let Some(&first_page) = req.block_numbers.first() else {
return Some(ShardIndex::new(ShardNumber(0), count));
return Err(anyhow!("no block numbers in request"));
};
let key = rel_block_to_key(req.rel, first_page);
let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers
Ok(req
.block_numbers
.iter()
.skip(1) // computed above
.all(|&blkno| {
let key = rel_block_to_key(req.rel, blkno);
key_to_shard_number(count, stripe_size, &key) == shard_number
})
.then_some(ShardIndex::new(shard_number, count))
.then_some(ShardIndex::new(shard_number, count)))
}
/// Splits the given request.
pub fn split(
req: page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Self {
stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<Self> {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::for_single_shard(&req, count, stripe_size).is_none(),
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
"unnecessary request split"
);
if count.is_unsharded() {
return Err(anyhow!("unsharded tenant, no point in splitting request"));
}
let Some(stripe_size) = stripe_size else {
return Err(anyhow!("stripe size must be given for sharded tenants"));
};
// Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
@@ -103,11 +115,11 @@ impl GetPageSplitter {
.collect(),
};
Self {
Ok(Self {
requests,
response,
block_shards,
}
})
}
/// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
@@ -124,21 +136,30 @@ impl GetPageSplitter {
&mut self,
shard_id: ShardIndex,
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
) -> anyhow::Result<()> {
// The caller should already have converted status codes into tonic::Status.
if response.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"unexpected non-OK response for shard {shard_id}: {} {}",
response.status_code,
response.reason.unwrap_or_default()
)));
));
}
if response.request_id != self.response.request_id {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id, response.request_id
)));
self.response.request_id,
response.request_id
));
}
if response.request_id != self.response.request_id {
return Err(anyhow!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id,
response.request_id
));
}
// Place the shard response pages into the assembled response, in request order.
@@ -150,27 +171,26 @@ impl GetPageSplitter {
}
let Some(slot) = self.response.pages.get_mut(i) else {
return Err(tonic::Status::internal(format!(
"no block_shards slot {i} for shard {shard_id}"
)));
return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
};
let Some(page) = pages.next() else {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"missing page {} in shard {shard_id} response",
slot.block_number
)));
));
};
if page.block_number != slot.block_number {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
slot.block_number, page.block_number
)));
slot.block_number,
page.block_number
));
}
if !slot.image.is_empty() {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"shard {shard_id} returned duplicate page {} at index {i}",
slot.block_number
)));
));
}
*slot = page;
@@ -178,10 +198,10 @@ impl GetPageSplitter {
// Make sure we've consumed all pages from the shard response.
if let Some(extra_page) = pages.next() {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"shard {shard_id} returned extra page: {}",
extra_page.block_number
)));
));
}
Ok(())
@@ -189,18 +209,18 @@ impl GetPageSplitter {
/// Fetches the final, assembled response.
#[allow(clippy::result_large_err)]
pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
pub fn get_response(self) -> anyhow::Result<page_api::GetPageResponse> {
// Check that the response is complete.
for (i, page) in self.response.pages.iter().enumerate() {
if page.image.is_empty() {
return Err(tonic::Status::internal(format!(
return Err(anyhow!(
"missing page {} for shard {}",
page.block_number,
self.block_shards
.get(i)
.map(|s| s.to_string())
.unwrap_or_else(|| "?".to_string())
)));
));
}
}

View File

@@ -89,7 +89,7 @@ async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()>
let cold_key_range = splitpoint..key_range.end;
for i in 0..cmd.num_records {
let chosen_range = if rand::thread_rng().gen_bool(0.9) {
let chosen_range = if rand::rng().random_bool(0.9) {
&hot_key_range
} else {
&cold_key_range

View File

@@ -300,9 +300,9 @@ impl MockTimeline {
key_range: &Range<Key>,
) -> anyhow::Result<()> {
crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
for _ in 0..num_records {
self.ingest_record(rng.gen_range(key_range.clone()), len);
self.ingest_record(rng.random_range(key_range.clone()), len);
self.wal_ingested += len;
}
Ok(())

View File

@@ -4,7 +4,7 @@ use anyhow::Context;
use clap::Parser;
use pageserver_api::key::Key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::{ShardCount, ShardStripeSize};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize};
#[derive(Parser)]
pub(super) struct DescribeKeyCommand {
@@ -128,7 +128,9 @@ impl DescribeKeyCommand {
// seeing the sharding placement might be confusing, so leave it out unless shard
// count was given.
let stripe_size = stripe_size.map(ShardStripeSize).unwrap_or_default();
let stripe_size = stripe_size
.map(ShardStripeSize)
.unwrap_or(DEFAULT_STRIPE_SIZE);
println!(
"# placement with shard_count: {} and stripe_size: {}:",
shard_count.0, stripe_size.0

View File

@@ -17,11 +17,11 @@
// grpcurl \
// -plaintext \
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
// -H "neon-shard-id: 0b10" \
// -H "neon-shard-id: 0000" \
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
// -H "authorization: Bearer $JWT" \
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
// localhost:51051 page_api.PageService/CheckRelExists
// -d '{"read_lsn": {"request_lsn": 100000000, "not_modified_since_lsn": 1}, "db_oid": 1}' \
// localhost:51051 page_api.PageService/GetDbSize
// ```
//
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
@@ -38,8 +38,8 @@ package page_api;
import "google/protobuf/timestamp.proto";
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
// NB: unlike libpq, there is no CheckRelExists in gRPC, at the compute team's request. Instead,
// use GetRelSize with allow_missing=true to check existence.
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
@@ -97,17 +97,6 @@ message RelTag {
uint32 fork_number = 4;
}
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
// other shards will error.
message CheckRelExistsRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup.
message GetBaseBackupRequest {
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
@@ -260,10 +249,15 @@ enum GetPageStatusCode {
message GetRelSizeRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
// If true, return missing=true for missing relations instead of a NotFound error.
bool allow_missing = 3;
}
message GetRelSizeResponse {
// The number of blocks in the relation.
uint32 num_blocks = 1;
// If allow_missing=true, this is true for missing relations.
bool missing = 2;
}
// Requests an SLRU segment. Only valid on shard 0, other shards will error.

View File

@@ -69,16 +69,6 @@ impl Client {
Ok(Self { inner })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: CheckRelExistsRequest,
) -> tonic::Result<CheckRelExistsResponse> {
let req = proto::CheckRelExistsRequest::from(req);
let resp = self.inner.check_rel_exists(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
@@ -114,7 +104,8 @@ impl Client {
Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
}
/// Returns the size of a relation, as # of blocks.
/// Returns the size of a relation as # of blocks, or None if allow_missing=true and the
/// relation does not exist.
pub async fn get_rel_size(
&mut self,
req: GetRelSizeRequest,

View File

@@ -139,50 +139,6 @@ impl From<RelTag> for proto::RelTag {
}
}
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
#[derive(Clone, Copy, Debug)]
pub struct CheckRelExistsRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
}
impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
type Error = ProtocolError;
fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
})
}
}
impl From<CheckRelExistsRequest> for proto::CheckRelExistsRequest {
fn from(request: CheckRelExistsRequest) -> Self {
Self {
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
}
}
}
pub type CheckRelExistsResponse = bool;
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
fn from(pb: proto::CheckRelExistsResponse) -> Self {
pb.exists
}
}
impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
fn from(exists: CheckRelExistsResponse) -> Self {
Self { exists }
}
}
/// Requests a base backup.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
@@ -707,6 +663,8 @@ impl From<GetPageStatusCode> for tonic::Code {
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
/// If true, return missing=true for missing relations instead of a NotFound error.
pub allow_missing: bool,
}
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
@@ -719,6 +677,7 @@ impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
allow_missing: proto.allow_missing,
})
}
}
@@ -728,21 +687,29 @@ impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
Self {
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
allow_missing: request.allow_missing,
}
}
}
pub type GetRelSizeResponse = u32;
/// The size of a relation as number of blocks, or None if `allow_missing=true` and the relation
/// does not exist.
///
/// INVARIANT: never None if `allow_missing=false` (returns `NotFound` error instead).
pub type GetRelSizeResponse = Option<u32>;
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
fn from(proto: proto::GetRelSizeResponse) -> Self {
proto.num_blocks
fn from(pb: proto::GetRelSizeResponse) -> Self {
(!pb.missing).then_some(pb.num_blocks)
}
}
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
fn from(num_blocks: GetRelSizeResponse) -> Self {
Self { num_blocks }
fn from(resp: GetRelSizeResponse) -> Self {
Self {
num_blocks: resp.unwrap_or_default(),
missing: resp.is_none(),
}
}
}

View File

@@ -188,9 +188,9 @@ async fn main_impl(
start_work_barrier.wait().await;
loop {
let (timeline, work) = {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
let lsn = target.lsn_range.clone().map(|r| rng.random_range(r));
(target.timeline, Work { lsn })
};
let sender = work_senders.get(&timeline).unwrap();

View File

@@ -326,8 +326,7 @@ async fn main_impl(
.cloned()
.collect();
let weights =
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
.unwrap();
rand::distr::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len())).unwrap();
Box::pin(async move {
let scheme = match Url::parse(&args.page_service_connstring) {
@@ -427,7 +426,7 @@ async fn run_worker(
cancel: CancellationToken,
rps_period: Option<Duration>,
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
weights: rand::distr::weighted::WeightedIndex<i128>,
) {
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
@@ -469,9 +468,9 @@ async fn run_worker(
}
// Pick a random page from a random relation.
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key: i128 = rng.random_range(r.start..r.end);
let (rel_tag, block_no) = key_to_block(key);
let mut blks = VecDeque::with_capacity(batch_size);
@@ -502,7 +501,7 @@ async fn run_worker(
// We assume that the entire batch can fit within the relation.
assert_eq!(blks.len(), batch_size, "incomplete batch");
let req_lsn = if rng.gen_bool(args.req_latest_probability) {
let req_lsn = if rng.random_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn

View File

@@ -7,7 +7,7 @@ use std::time::{Duration, Instant};
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use rand::seq::IndexedMutRandom;
use tokio::sync::{OwnedSemaphorePermit, mpsc};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -260,7 +260,7 @@ async fn timeline_actor(
loop {
let layer_tx = {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
timeline.layers.choose_mut(&mut rng).expect("no layers")
};
match layer_tx.try_send(permit.take().unwrap()) {

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::net::IpAddr;
use futures::Future;
use pageserver_api::config::NodeMetadata;
@@ -16,7 +17,7 @@ use tokio_util::sync::CancellationToken;
use url::Url;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::{backoff, failpoint_support};
use utils::{backoff, failpoint_support, ip_address};
use crate::config::PageServerConf;
use crate::virtual_file::on_fatal_io_error;
@@ -27,6 +28,7 @@ pub struct StorageControllerUpcallClient {
http_client: reqwest::Client,
base_url: Url,
node_id: NodeId,
node_ip_addr: Option<IpAddr>,
cancel: CancellationToken,
}
@@ -40,6 +42,7 @@ pub trait StorageControllerUpcallApi {
fn re_attach(
&self,
conf: &PageServerConf,
empty_local_disk: bool,
) -> impl Future<
Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
> + Send;
@@ -91,11 +94,18 @@ impl StorageControllerUpcallClient {
);
}
// Intentionally panics if we encountered any errors parsing or reading the IP address.
// Note that if the required environment variable is not set, `read_node_ip_addr_from_env` returns `Ok(None)`
// instead of an error.
let node_ip_addr =
ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
Self {
http_client: client.build().expect("Failed to construct HTTP client"),
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
node_ip_addr,
}
}
@@ -146,6 +156,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
async fn re_attach(
&self,
conf: &PageServerConf,
empty_local_disk: bool,
) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
let url = self
.base_url
@@ -193,8 +204,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: m.https_port,
node_ip_addr: self.node_ip_addr,
availability_zone_id: az_id.expect("Checked above"),
node_ip_addr: None,
})
}
Err(e) => {
@@ -217,6 +228,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
let request = ReAttachRequest {
node_id: self.node_id,
register: register.clone(),
empty_local_disk: Some(empty_local_disk),
};
let response: ReAttachResponse = self

View File

@@ -768,6 +768,7 @@ mod test {
async fn re_attach(
&self,
_conf: &PageServerConf,
_empty_local_disk: bool,
) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
unimplemented!()
}

View File

@@ -155,7 +155,7 @@ impl FeatureResolver {
);
let tenant_properties = PerTenantProperties {
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
}
.into_posthog_properties();

View File

@@ -1636,9 +1636,10 @@ impl PageServerHandler {
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
vec![
Self::handle_get_nblocks_request(&shard, &req, &ctx)
Self::handle_get_nblocks_request(&shard, &req, false, &ctx)
.instrument(span.clone())
.await
.map(|msg| msg.expect("allow_missing=false"))
.map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
@@ -2303,12 +2304,16 @@ impl PageServerHandler {
Ok(PagestreamExistsResponse { req: *req, exists })
}
/// If `allow_missing` is true, returns None instead of Err on missing relations. Otherwise,
/// never returns None. It is only supported by the gRPC protocol, so we pass it separately to
/// avoid changing the libpq protocol types.
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_nblocks_request(
timeline: &Timeline,
req: &PagestreamNblocksRequest,
allow_missing: bool,
ctx: &RequestContext,
) -> Result<PagestreamNblocksResponse, PageStreamError> {
) -> Result<Option<PagestreamNblocksResponse>, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2320,20 +2325,25 @@ impl PageServerHandler {
.await?;
let n_blocks = timeline
.get_rel_size(
.get_rel_size_in_reldir(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
None,
allow_missing,
ctx,
)
.await?;
let Some(n_blocks) = n_blocks else {
return Ok(None);
};
Ok(PagestreamNblocksResponse {
Ok(Some(PagestreamNblocksResponse {
req: *req,
n_blocks,
})
}))
}
#[instrument(skip_all, fields(shard_id))]
@@ -3218,13 +3228,25 @@ where
pub struct GrpcPageServiceHandler {
tenant_manager: Arc<TenantManager>,
ctx: RequestContext,
/// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for
/// in-flight requests to complete. Any tasks we spawn ourselves must respect this token.
cancel: CancellationToken,
/// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to
/// complete during shutdown. Request handlers implicitly hold this guard already.
gate_guard: GateGuard,
/// `get_vectored` concurrency setting.
get_vectored_concurrent_io: GetVectoredConcurrentIo,
}
impl GrpcPageServiceHandler {
/// Spawns a gRPC server for the page service.
///
/// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for
/// any in-flight requests and tasks to complete first.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn(
@@ -3234,12 +3256,15 @@ impl GrpcPageServiceHandler {
get_vectored_concurrent_io: GetVectoredConcurrentIo,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
// Set up a cancellation token for shutting down the server, and a gate to wait for all
// requests and spawned tasks to complete.
let cancel = CancellationToken::new();
let gate = Gate::default();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
@@ -3270,6 +3295,7 @@ impl GrpcPageServiceHandler {
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
cancel: cancel.clone(),
gate_guard: gate.enter().expect("gate was just created"),
get_vectored_concurrent_io,
};
@@ -3306,19 +3332,20 @@ impl GrpcPageServiceHandler {
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
// Spawn server task. It runs until the cancellation token fires and in-flight requests and
// tasks complete. The `CancellableTask` will wait for the task's join handle, which
// implicitly waits for the gate to close.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
"grpc pageservice listener",
async move {
let result = server
server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
.await?;
// Server exited cleanly. All requests should have completed by now. Wait for any
// spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate.
gate.close().await;
anyhow::Ok(())
},
));
@@ -3508,7 +3535,10 @@ impl GrpcPageServiceHandler {
/// Implements the gRPC page service.
///
/// TODO: cancellation.
/// On client disconnect (e.g. timeout or client shutdown), Tonic will drop the request handler
/// futures, so the read path must be cancellation-safe. On server shutdown, Tonic will wait for
/// in-flight requests to complete.
///
/// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
#[tonic::async_trait]
impl proto::PageService for GrpcPageServiceHandler {
@@ -3519,39 +3549,6 @@ impl proto::PageService for GrpcPageServiceHandler {
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
#[instrument(skip_all, fields(rel, lsn))]
async fn check_rel_exists(
&self,
req: tonic::Request<proto::CheckRelExistsRequest>,
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
Self::ensure_shard_zero(&timeline)?;
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, None),
rel: req.rel,
};
// Execute the request and convert the response.
let _timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetRelExists,
received_at,
)
.await?;
let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
let resp: page_api::CheckRelExistsResponse = resp.exists;
Ok(tonic::Response::new(resp.into()))
}
#[instrument(skip_all, fields(lsn))]
async fn get_base_backup(
&self,
@@ -3593,8 +3590,14 @@ impl proto::PageService for GrpcPageServiceHandler {
// Spawn a task to run the basebackup.
let span = Span::current();
let gate_guard = self
.gate_guard
.try_clone()
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let _gate_guard = gate_guard; // keep gate open until task completes
let gzip_level = match req.compression {
page_api::BaseBackupCompression::None => None,
// NB: using fast compression because it's on the critical path for compute
@@ -3718,15 +3721,17 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?;
// Spawn an IoConcurrency sidecar, if enabled.
let Ok(gate_guard) = self.gate_guard.try_clone() else {
return Err(tonic::Status::unavailable("shutting down"));
};
let gate_guard = self
.gate_guard
.try_clone()
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
let io_concurrency =
IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
// Spawn a task to handle the GetPageRequest stream.
// Construct the GetPageRequest stream handler.
let span = Span::current();
let ctx = self.ctx.attached_child();
let cancel = self.cancel.clone();
let mut reqs = req.into_inner();
let resps = async_stream::try_stream! {
@@ -3734,7 +3739,19 @@ impl proto::PageService for GrpcPageServiceHandler {
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
loop {
// NB: Tonic considers the entire stream to be an in-flight request and will wait
// for it to complete before shutting down. React to cancellation between requests.
let req = tokio::select! {
biased;
_ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")),
result = reqs.message() => match result {
Ok(Some(req)) => Ok(req),
Ok(None) => break, // client closed the stream
Err(err) => Err(err),
},
}?;
let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
@@ -3758,7 +3775,7 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(Box::pin(resps)))
}
#[instrument(skip_all, fields(rel, lsn))]
#[instrument(skip_all, fields(rel, lsn, allow_missing))]
async fn get_rel_size(
&self,
req: tonic::Request<proto::GetRelSizeRequest>,
@@ -3770,8 +3787,9 @@ impl proto::PageService for GrpcPageServiceHandler {
// Validate the request, decorate the span, and convert it to a Pagestream request.
Self::ensure_shard_zero(&timeline)?;
let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
let allow_missing = req.allow_missing;
span_record!(rel=%req.rel, lsn=%req.read_lsn);
span_record!(rel=%req.rel, lsn=%req.read_lsn, allow_missing=%req.allow_missing);
let req = PagestreamNblocksRequest {
hdr: Self::make_hdr(req.read_lsn, None),
@@ -3786,8 +3804,11 @@ impl proto::PageService for GrpcPageServiceHandler {
)
.await?;
let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
let resp: page_api::GetRelSizeResponse = resp.n_blocks;
let resp =
PageServerHandler::handle_get_nblocks_request(&timeline, &req, allow_missing, &ctx)
.await?;
let resp: page_api::GetRelSizeResponse = resp.map(|resp| resp.n_blocks);
Ok(tonic::Response::new(resp.into()))
}

View File

@@ -286,6 +286,10 @@ impl Timeline {
/// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
///
/// The ordering of the returned vec corresponds to the ordering of `pages`.
///
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
/// if the client goes away (e.g. due to timeout or cancellation).
/// TODO: verify that it actually is cancellation-safe.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
@@ -500,8 +504,9 @@ impl Timeline {
for rel in rels {
let n_blocks = self
.get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx)
.await?;
.get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
.await?
.expect("allow_missing=false");
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
@@ -517,10 +522,16 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
self.get_rel_size_in_reldir(tag, version, None, ctx).await
Ok(self
.get_rel_size_in_reldir(tag, version, None, false, ctx)
.await?
.expect("allow_missing=false"))
}
/// Get size of a relation file. The relation must exist, otherwise an error is returned.
/// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
/// otherwise errors.
///
/// INVARIANT: never returns None if `allow_missing=false`.
///
/// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
pub(crate) async fn get_rel_size_in_reldir(
@@ -528,8 +539,9 @@ impl Timeline {
tag: RelTag,
version: Version<'_>,
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
allow_missing: bool,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
) -> Result<Option<BlockNumber>, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -537,7 +549,15 @@ impl Timeline {
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(nblocks);
return Ok(Some(nblocks));
}
if allow_missing
&& !self
.get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
.await?
{
return Ok(None);
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
@@ -549,7 +569,7 @@ impl Timeline {
// FSM, and smgrnblocks() on it immediately afterwards,
// without extending it. Tolerate that by claiming that
// any non-existent FSM fork has size 0.
return Ok(0);
return Ok(Some(0));
}
let key = rel_size_to_key(tag);
@@ -558,7 +578,7 @@ impl Timeline {
self.update_cached_rel_size(tag, version, nblocks);
Ok(nblocks)
Ok(Some(nblocks))
}
/// Does the relation exist?
@@ -2908,9 +2928,8 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
mod tests {
use hex_literal::hex;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::ShardStripeSize;
use utils::id::TimelineId;
use utils::shard::{ShardCount, ShardNumber};
use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -6161,11 +6161,11 @@ mod tests {
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
use pageserver_compaction::helpers::overlaps_with;
use rand::Rng;
#[cfg(feature = "testing")]
use rand::SeedableRng;
#[cfg(feature = "testing")]
use rand::rngs::StdRng;
use rand::{Rng, thread_rng};
#[cfg(feature = "testing")]
use std::ops::Range;
use storage_layer::{IoConcurrency, PersistentLayerKey};
@@ -6286,8 +6286,8 @@ mod tests {
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
let gap = random.random_range(1..=100) <= spec.gap_chance;
let will_init = random.random_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
@@ -6330,8 +6330,8 @@ mod tests {
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
let gap = random.random_range(1..=100) <= spec.gap_chance;
let will_init = random.random_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
@@ -7808,7 +7808,7 @@ mod tests {
for _ in 0..50 {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
let blknum = rand::rng().random_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
writer
@@ -7897,7 +7897,7 @@ mod tests {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
let blknum = rand::rng().random_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
writer
@@ -7965,7 +7965,7 @@ mod tests {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
let blknum = rand::rng().random_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
writer
@@ -8229,7 +8229,7 @@ mod tests {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
let blknum = rand::rng().random_range(0..NUM_KEYS);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
@@ -8502,7 +8502,7 @@ mod tests {
for iter in 1..=10 {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
let blknum = rand::rng().random_range(0..NUM_KEYS);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
@@ -11291,10 +11291,10 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_read_path() -> anyhow::Result<()> {
use rand::seq::SliceRandom;
use rand::seq::IndexedRandom;
let seed = if cfg!(feature = "fuzz-read-path") {
let seed: u64 = thread_rng().r#gen();
let seed: u64 = rand::rng().random();
seed
} else {
// Use a hard-coded seed when not in fuzzing mode.
@@ -11308,8 +11308,8 @@ mod tests {
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
const QUERIES: u64 = 5000;
let will_init_chance: u8 = random.gen_range(0..=10);
let gap_chance: u8 = random.gen_range(0..=50);
let will_init_chance: u8 = random.random_range(0..=10);
let gap_chance: u8 = random.random_range(0..=50);
(QUERIES, will_init_chance, gap_chance)
} else {
@@ -11410,7 +11410,8 @@ mod tests {
while used_keys.len() < tenant.conf.max_get_vectored_keys.get() {
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
let mut selected_key =
start_key.add(random.random_range(0..KEY_DIMENSION_SIZE));
while used_keys.len() < tenant.conf.max_get_vectored_keys.get() {
if used_keys.contains(&selected_key)
@@ -11425,7 +11426,7 @@ mod tests {
.add_key(selected_key);
used_keys.insert(selected_key);
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
let pick_next = random.random_range(0..=100) <= PICK_NEXT_CHANCE;
if pick_next {
selected_key = selected_key.next();
} else {

View File

@@ -535,8 +535,8 @@ pub(crate) mod tests {
}
pub(crate) fn random_array(len: usize) -> Vec<u8> {
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.r#gen()).collect::<_>()
let mut rng = rand::rng();
(0..len).map(|_| rng.random()).collect::<_>()
}
#[tokio::test]
@@ -588,9 +588,9 @@ pub(crate) mod tests {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
.map(|_| {
let mut sz: u16 = rng.r#gen();
let mut sz: u16 = rng.random();
// Make 50% of the arrays small
if rng.r#gen() {
if rng.random() {
sz &= 63;
}
random_array(sz.into())

View File

@@ -1090,7 +1090,7 @@ pub(crate) mod tests {
const NUM_KEYS: usize = 100000;
let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
for idx in 0..NUM_KEYS {
let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
let u: f64 = rand::rng().random_range(0.0..1.0);
let t = -(f64::ln(u));
let key_int = (t * 1000000.0) as u128;
@@ -1116,7 +1116,7 @@ pub(crate) mod tests {
// Test get() operations on random keys, most of which will not exist
for _ in 0..100000 {
let key_int = rand::thread_rng().r#gen::<u128>();
let key_int = rand::rng().random::<u128>();
let search_key = u128::to_be_bytes(key_int);
assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
}

View File

@@ -508,8 +508,8 @@ mod tests {
let write_nbytes = cap * 2 + cap / 2;
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
let content: Vec<u8> = rand::rng()
.sample_iter(rand::distr::StandardUniform)
.take(write_nbytes)
.collect();
@@ -565,8 +565,8 @@ mod tests {
let cap = writer.mutable().capacity();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
let content: Vec<u8> = rand::rng()
.sample_iter(rand::distr::StandardUniform)
.take(cap * 2 + cap / 2)
.collect();
@@ -614,8 +614,8 @@ mod tests {
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
let content: Vec<u8> = rand::rng()
.sample_iter(rand::distr::StandardUniform)
.take(cap * 2 + cap / 2)
.collect();

View File

@@ -19,7 +19,7 @@ use pageserver_api::shard::{
};
use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::Rng;
use rand::distributions::Alphanumeric;
use rand::distr::Alphanumeric;
use remote_storage::TimeoutOrCancel;
use sysinfo::SystemExt;
use tokio::fs;
@@ -218,7 +218,7 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
std::io::ErrorKind::InvalidInput,
"Path must be absolute",
))?;
let rand_suffix = rand::thread_rng()
let rand_suffix = rand::rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
@@ -328,7 +328,7 @@ fn emergency_generations(
LocationMode::Attached(alc) => TenantStartupMode::Attached((
alc.attach_mode,
alc.generation,
ShardStripeSize::default(),
lc.shard.stripe_size,
)),
LocationMode::Secondary(_) => TenantStartupMode::Secondary,
},
@@ -352,7 +352,8 @@ async fn init_load_generations(
let client = StorageControllerUpcallClient::new(conf, cancel);
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
let empty_local_disk = tenant_confs.is_empty();
match client.re_attach(conf, empty_local_disk).await {
Ok(tenants) => tenants
.into_iter()
.flat_map(|(id, rart)| {

View File

@@ -1,8 +1,8 @@
use chrono::NaiveDateTime;
use pageserver_api::shard::ShardStripeSize;
use serde::{Deserialize, Serialize};
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardStripeSize;
/// Tenant shard manifest, stored in remote storage. Contains offloaded timelines and other tenant
/// shard-wide information that must be persisted in remote storage.

View File

@@ -25,7 +25,7 @@ pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
if d == Duration::ZERO {
d
} else {
rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
rand::rng().random_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
}
}
@@ -35,7 +35,7 @@ pub(super) fn period_warmup(period: Duration) -> Duration {
if period == Duration::ZERO {
period
} else {
rand::thread_rng().gen_range(Duration::ZERO..period)
rand::rng().random_range(Duration::ZERO..period)
}
}

View File

@@ -1634,7 +1634,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use postgres_ffi::PgMajorVersion;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::prelude::{SeedableRng, StdRng};
use rand::seq::IndexedRandom;
use rand::{Rng, RngCore};
/// Construct an index for a fictional delta layer and and then
@@ -1788,14 +1789,14 @@ pub(crate) mod test {
let mut entries = Vec::new();
for _ in 0..constants::KEY_COUNT {
let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
let count = rng.random_range(1..constants::MAX_ENTRIES_PER_KEY);
let mut lsns_iter =
std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
Some(Lsn(lsn.0 + 0x08))
});
let mut lsns = Vec::new();
while lsns.len() < count as usize {
let take = rng.gen_bool(0.5);
let take = rng.random_bool(0.5);
let lsn = lsns_iter.next().unwrap();
if take {
lsns.push(lsn);
@@ -1869,12 +1870,13 @@ pub(crate) mod test {
for _ in 0..constants::RANGES_COUNT {
let mut range: Option<Range<Key>> = Option::default();
while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
let range_start = rng.gen_range(start..end);
let range_start = rng.random_range(start..end);
let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
if range_end_offset >= end {
range = Some(Key::from_i128(range_start)..Key::from_i128(end));
} else {
let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
let range_end =
rng.random_range((range_start + constants::MIN_RANGE_SIZE)..end);
range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
}
}

View File

@@ -440,8 +440,8 @@ mod tests {
impl InMemoryFile {
fn new_random(len: usize) -> Self {
Self {
content: rand::thread_rng()
.sample_iter(rand::distributions::Standard)
content: rand::rng()
.sample_iter(rand::distr::StandardUniform)
.take(len)
.collect(),
}
@@ -498,7 +498,7 @@ mod tests {
len
}
};
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs
rand::Rng::fill(&mut rand::rng(), &mut dst_slice[nread..]); // to discover bugs
Ok((dst, nread))
}
}
@@ -763,7 +763,7 @@ mod tests {
let len = std::cmp::min(dst.bytes_total(), mocked_bytes.len());
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
dst_slice[..len].copy_from_slice(&mocked_bytes[..len]);
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
rand::Rng::fill(&mut rand::rng(), &mut dst_slice[len..]); // to discover bugs
Ok((dst, len))
}
Err(e) => Err(std::io::Error::other(e)),

View File

@@ -515,7 +515,7 @@ pub(crate) async fn sleep_random_range(
interval: RangeInclusive<Duration>,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let delay = rand::thread_rng().gen_range(interval);
let delay = rand::rng().random_range(interval);
if delay == Duration::ZERO {
return Ok(delay);
}

View File

@@ -1324,6 +1324,9 @@ impl Timeline {
///
/// This naive implementation will be replaced with a more efficient one
/// which actually vectorizes the read path.
///
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
/// if the client goes away (e.g. due to timeout or cancellation).
pub(crate) async fn get_vectored(
&self,
query: VersionedKeySpaceQuery,
@@ -2823,7 +2826,7 @@ impl Timeline {
if r.numerator == 0 {
false
} else {
rand::thread_rng().gen_range(0..r.denominator) < r.numerator
rand::rng().random_range(0..r.denominator) < r.numerator
}
}
None => false,
@@ -3905,7 +3908,7 @@ impl Timeline {
// 1hour base
(60_i64 * 60_i64)
// 10min jitter
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
+ rand::rng().random_range(-10 * 60..10 * 60),
)
.expect("10min < 1hour"),
);

View File

@@ -654,7 +654,7 @@ mod tests {
use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
use pageserver_api::models::ShardParameters;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::DEFAULT_STRIPE_SIZE;
use utils::shard::ShardCount;
use utils::sync::gate::GateGuard;
@@ -955,7 +955,7 @@ mod tests {
});
let child_params = ShardParameters {
count: ShardCount(2),
stripe_size: ShardStripeSize::default(),
stripe_size: DEFAULT_STRIPE_SIZE,
};
let child0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),

View File

@@ -1275,8 +1275,8 @@ mod tests {
use std::sync::Arc;
use owned_buffers_io::io_buf_ext::IoBufExt;
use rand::Rng;
use rand::seq::SliceRandom;
use rand::{Rng, thread_rng};
use super::*;
use crate::context::DownloadBehavior;
@@ -1358,7 +1358,7 @@ mod tests {
// Check that all the other FDs still work too. Use them in random order for
// good measure.
file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
file_b_dupes.as_mut_slice().shuffle(&mut rand::rng());
for vfile in file_b_dupes.iter_mut() {
assert_first_512_eq(vfile, b"content_b").await;
}
@@ -1413,9 +1413,8 @@ mod tests {
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
let hdl = rt.spawn(async move {
let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
let f = &files[rand::rng().random_range(0..files.len())];
buf = f
.read_exact_at(buf.slice_full(), 0, &ctx)
.await

View File

@@ -5,6 +5,7 @@ MODULE_big = neon
OBJS = \
$(WIN32RES) \
communicator.o \
communicator_process.o \
extension_server.o \
file_cache.o \
hll.o \
@@ -29,6 +30,11 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S), Darwin)
SHLIB_LINK += -framework Security -framework CoreFoundation -framework SystemConfiguration
endif
EXTENSION = neon
DATA = \
neon--1.0.sql \
@@ -57,7 +63,8 @@ WALPROP_OBJS = \
# libcommunicator.a is built by cargo from the Rust sources under communicator/
# subdirectory. `cargo build` also generates communicator_bindings.h.
neon.o: communicator/communicator_bindings.h
communicator_process.o: communicator/communicator_bindings.h
file_cache.o: communicator/communicator_bindings.h
$(NEON_CARGO_ARTIFACT_TARGET_DIR)/libcommunicator.a communicator/communicator_bindings.h &:
(cd $(srcdir)/communicator && cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE))
@@ -80,11 +87,14 @@ libwalproposer.a: $(WALPROP_OBJS)
# INDENT pointing to pg_bsd_indent
# PGINDENT_SCRIPT pointing to pgindent (be careful with PGINDENT var name:
# pgindent will pick it up as pg_bsd_indent path).
#
# optional vars:
# PGINDENT_FLAGS additional flags to pass to pgindent
.PHONY: pgindent
pgindent:
+@ echo top_srcdir=$(top_srcdir) top_builddir=$(top_builddir) srcdir=$(srcdir)
$(FIND_TYPEDEF) . > neon.typedefs
INDENT=$(INDENT) $(PGINDENT_SCRIPT) --typedefs neon.typedefs $(srcdir)/*.c $(srcdir)/*.h
INDENT=$(INDENT) $(PGINDENT_SCRIPT) $(PGINDENT_FLAGS) --typedefs neon.typedefs $(srcdir)/*.c $(srcdir)/*.h
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)

View File

@@ -1820,12 +1820,12 @@ nm_to_string(NeonMessage *msg)
}
case T_NeonGetPageResponse:
{
#if 0
NeonGetPageResponse *msg_resp = (NeonGetPageResponse *) msg;
#endif
appendStringInfoString(&s, "{\"type\": \"NeonGetPageResponse\"");
appendStringInfo(&s, ", \"page\": \"XXX\"}");
appendStringInfo(&s, ", \"rinfo\": %u/%u/%u", RelFileInfoFmt(msg_resp->req.rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_resp->req.forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_resp->req.blkno);
appendStringInfoChar(&s, '}');
break;
}

View File

@@ -11,9 +11,19 @@ crate-type = ["staticlib"]
# 'testing' feature is currently unused in the communicator, but we accept it for convenience of
# calling build scripts, so that you can pass the same feature to all packages.
testing = []
# 'rest_broker' feature is currently unused in the communicator, but we accept it for convenience of
# calling build scripts, so that you can pass the same feature to all packages.
rest_broker = []
[dependencies]
neon-shmem.workspace = true
axum.workspace = true
http.workspace = true
tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-subscriber.workspace = true
measured.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }
[build-dependencies]

View File

@@ -1,7 +1,22 @@
This package will evolve into a "compute-pageserver communicator"
process and machinery. For now, it's just a dummy that doesn't do
anything interesting, but it allows us to test the compilation and
linking of Rust code into the Postgres extensions.
# Communicator
This package provides the so-called "compute-pageserver communicator",
or just "communicator" in short. The communicator is a separate
background worker process that runs in the PostgreSQL server. It's
part of the neon extension. Currently, it only provides an HTTP
endpoint for metrics, but in the future it will evolve to handle all
communications with the pageservers.
## Source code view
pgxn/neon/communicator_process.c
Contains code needed to start up the communicator process, and
the glue that interacts with PostgreSQL code and the Rust
code in the communicator process.
pgxn/neon/communicator/src/worker_process/
Worker process main loop and glue code
At compilation time, pgxn/neon/communicator/ produces a static
library, libcommunicator.a. It is linked to the neon.so extension

View File

@@ -1,6 +1,5 @@
/// dummy function, just to test linking Rust functions into the C
/// extension
#[unsafe(no_mangle)]
pub extern "C" fn communicator_dummy(arg: u32) -> u32 {
arg + 1
}
mod worker_process;
/// Name of the Unix Domain Socket that serves the metrics, and other APIs in the
/// future. This is within the Postgres data directory.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";

View File

@@ -0,0 +1,51 @@
//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These
//! are implemented in `neon/pgxn/communicator_process.c`. The function signatures better
//! match!
//!
//! These are called from the communicator threads! Careful what you do, most Postgres
//! functions are not safe to call in that context.
#[cfg(not(test))]
unsafe extern "C" {
pub fn callback_set_my_latch_unsafe();
pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics;
}
// Compile unit tests with dummy versions of the functions. Unit tests cannot call back
// into the C code. (As of this writing, no unit tests even exists in the communicator
// package, but the code coverage build still builds these and tries to link with the
// external C code.)
#[cfg(test)]
unsafe fn callback_set_my_latch_unsafe() {
panic!("not usable in unit tests");
}
#[cfg(test)]
unsafe fn callback_get_lfc_metrics_unsafe() -> LfcMetrics {
panic!("not usable in unit tests");
}
// safe wrappers
pub(super) fn callback_set_my_latch() {
unsafe { callback_set_my_latch_unsafe() };
}
pub(super) fn callback_get_lfc_metrics() -> LfcMetrics {
unsafe { callback_get_lfc_metrics_unsafe() }
}
/// Return type of the callback_get_lfc_metrics() function.
#[repr(C)]
pub struct LfcMetrics {
pub lfc_cache_size_limit: i64,
pub lfc_hits: i64,
pub lfc_misses: i64,
pub lfc_used: i64,
pub lfc_writes: i64,
// working set size looking back 1..60 minutes.
//
// Index 0 is the size of the working set accessed within last 1 minute,
// index 59 is the size of the working set accessed within last 60 minutes.
pub lfc_approximate_working_set_size_windows: [i64; 60],
}

View File

@@ -0,0 +1,102 @@
//! Communicator control socket.
//!
//! Currently, the control socket is used to provide information about the communicator
//! process, file cache etc. as prometheus metrics. In the future, it can be used to
//! expose more things.
//!
//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
//! data directory. For debugging, you can access it with curl:
//!
//! ```sh
//! curl --unix-socket neon-communicator.socket http://localhost/metrics
//! ```
//!
use axum::Router;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use measured::MetricGroup;
use measured::text::BufferedTextEncoder;
use std::io::ErrorKind;
use tokio::net::UnixListener;
use crate::NEON_COMMUNICATOR_SOCKET_NAME;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
impl CommunicatorWorkerProcessStruct {
/// Launch the listener
pub(crate) async fn launch_control_socket_listener(
&'static self,
) -> Result<(), std::io::Error> {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
.route("/debug/panic", get(handle_debug_panic))
.with_state(self);
// If the server is restarted, there might be an old socket still
// lying around. Remove it first.
match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
Ok(()) => {
tracing::warn!("removed stale control socket");
}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => {
tracing::error!("could not remove stale control socket: {e:#}");
// Try to proceed anyway. It will likely fail below though.
}
};
// Create the unix domain socket and start listening on it
let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
tokio::spawn(async {
tracing::info!("control socket listener spawned");
axum::serve(listener, app)
.await
.expect("axum::serve never returns")
});
Ok(())
}
}
/// Expose all Prometheus metrics.
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
tracing::trace!("/metrics requested");
metrics_to_response(&state).await
}
/// Expose Prometheus metrics, for use by the autoscaling agent.
///
/// This is a subset of all the metrics.
async fn get_autoscaling_metrics(
State(state): State<&CommunicatorWorkerProcessStruct>,
) -> Response {
tracing::trace!("/metrics requested");
metrics_to_response(&state.lfc_metrics).await
}
async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
panic!("test HTTP handler task panic");
}
/// Helper function to convert prometheus metrics to a text response
async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
let mut enc = BufferedTextEncoder::new();
metrics
.collect_group_into(&mut enc)
.unwrap_or_else(|never| match never {});
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(enc.finish()))
.unwrap()
}

View File

@@ -0,0 +1,83 @@
use measured::{
FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup,
label::{LabelName, LabelValue, StaticLabelSet},
metric::{MetricEncoding, gauge::GaugeState, group::Encoding},
};
use super::callbacks::callback_get_lfc_metrics;
pub(crate) struct LfcMetricsCollector;
#[derive(MetricGroup)]
#[metric(new())]
struct LfcMetricsGroup {
/// LFC cache size limit in bytes
lfc_cache_size_limit: Gauge,
/// LFC cache hits
lfc_hits: Gauge,
/// LFC cache misses
lfc_misses: Gauge,
/// LFC chunks used (chunk = 1MB)
lfc_used: Gauge,
/// LFC cache writes
lfc_writes: Gauge,
/// Approximate working set size in pages of 8192 bytes
#[metric(init = GaugeVec::dense())]
lfc_approximate_working_set_size_windows: GaugeVec<StaticLabelSet<MinuteAsSeconds>>,
}
impl<T: Encoding> MetricGroup<T> for LfcMetricsCollector
where
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as Encoding>::Err> {
let g = LfcMetricsGroup::new();
let lfc_metrics = callback_get_lfc_metrics();
g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit);
g.lfc_hits.set(lfc_metrics.lfc_hits);
g.lfc_misses.set(lfc_metrics.lfc_misses);
g.lfc_used.set(lfc_metrics.lfc_used);
g.lfc_writes.set(lfc_metrics.lfc_writes);
for i in 0..60 {
let val = lfc_metrics.lfc_approximate_working_set_size_windows[i];
g.lfc_approximate_working_set_size_windows
.set(MinuteAsSeconds(i), val);
}
g.collect_group_into(enc)
}
}
/// This stores the values in range 0..60,
/// encodes them as seconds (60, 120, 180, ..., 3600)
#[derive(Clone, Copy)]
struct MinuteAsSeconds(usize);
impl FixedCardinalityLabel for MinuteAsSeconds {
fn cardinality() -> usize {
60
}
fn encode(&self) -> usize {
self.0
}
fn decode(value: usize) -> Self {
Self(value)
}
}
impl LabelValue for MinuteAsSeconds {
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
v.write_int((self.0 + 1) as i64 * 60)
}
}
impl LabelGroup for MinuteAsSeconds {
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
v.write_value(LabelName::from_str("duration_seconds"), self);
}
}

View File

@@ -0,0 +1,250 @@
//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
//!
//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
//! process latch is raised. That wakes up the loop in the main thread, see
//! `communicator_new_bgworker_main()`. It reads the message from the channel and
//! ereport()s it. This ensures that only one thread, the main thread, calls the
//! PostgreSQL logging routines at any time.
use std::ffi::c_char;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::mpsc::{TryRecvError, TrySendError};
use tracing::info;
use tracing::{Event, Level, Metadata, Subscriber};
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
use tracing_subscriber::registry::LookupSpan;
use crate::worker_process::callbacks::callback_set_my_latch;
/// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`]
pub struct LoggingReceiver {
receiver: Receiver<FormattedEventWithMeta>,
}
/// This is passed to `tracing`
struct LoggingSender {
sender: SyncSender<FormattedEventWithMeta>,
}
static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
/// Called once, at worker process startup. The returned LoggingState is passed back
/// in the subsequent calls to `pump_logging`. It is opaque to the C code.
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_configure_logging() -> Box<LoggingReceiver> {
let (sender, receiver) = sync_channel(1000);
let receiver = LoggingReceiver { receiver };
let sender = LoggingSender { sender };
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.event_format(SimpleFormatter)
.with_writer(sender)
// TODO: derive this from log_min_messages? Currently the code in
// communicator_process.c forces log_min_messages='INFO'.
.with_filter(LevelFilter::from_level(Level::INFO)),
);
r.init();
info!("communicator process logging started");
Box::new(receiver)
}
/// Read one message from the logging queue. This is essentially a wrapper to Receiver,
/// with a C-friendly signature.
///
/// The message is copied into *errbuf, which is a caller-supplied buffer of size
/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always
/// NULL-terminated.
///
/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
/// elog.h
///
/// If there was a message, *dropped_event_count_p is also updated with a counter of how
/// many log messages in total has been dropped. By comparing that with the value from
/// previous call, you can tell how many were dropped since last call.
///
/// Returns:
///
/// 0 if there were no messages
/// 1 if there was a message. The message and its level are returned in
/// *errbuf and *elevel_p. *dropped_event_count_p is also updated.
/// -1 on error, i.e the other end of the queue was disconnected
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_poll_logging(
state: &mut LoggingReceiver,
errbuf: *mut c_char,
errbuf_len: u32,
elevel_p: &mut i32,
dropped_event_count_p: &mut u64,
) -> i32 {
let msg = match state.receiver.try_recv() {
Err(TryRecvError::Empty) => return 0,
Err(TryRecvError::Disconnected) => return -1,
Ok(msg) => msg,
};
let src: &[u8] = &msg.message;
let dst: *mut u8 = errbuf.cast();
let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
unsafe {
std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
*(dst.add(len)) = b'\0'; // NULL terminator
}
// Map the tracing Level to PostgreSQL elevel.
//
// XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to
// hide these?
*elevel_p = match msg.level {
Level::TRACE => 10, // DEBUG5
Level::DEBUG => 14, // DEBUG1
Level::INFO => 17, // INFO
Level::WARN => 19, // WARNING
Level::ERROR => 21, // ERROR
};
*dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed);
1
}
//---- The following functions can be called from any thread ----
#[derive(Clone)]
struct FormattedEventWithMeta {
message: Vec<u8>,
level: tracing::Level,
}
impl Default for FormattedEventWithMeta {
fn default() -> Self {
FormattedEventWithMeta {
message: Vec::new(),
level: tracing::Level::DEBUG,
}
}
}
struct EventBuilder<'a> {
event: FormattedEventWithMeta,
sender: &'a LoggingSender,
}
impl std::io::Write for EventBuilder<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.event.message.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.sender.send_event(self.event.clone());
Ok(())
}
}
impl Drop for EventBuilder<'_> {
fn drop(&mut self) {
let sender = self.sender;
let event = std::mem::take(&mut self.event);
sender.send_event(event);
}
}
impl<'a> MakeWriter<'a> for LoggingSender {
type Writer = EventBuilder<'a>;
fn make_writer(&'a self) -> Self::Writer {
panic!("not expected to be called when make_writer_for is implemented");
}
fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
EventBuilder {
event: FormattedEventWithMeta {
message: Vec::new(),
level: *meta.level(),
},
sender: self,
}
}
}
impl LoggingSender {
fn send_event(&self, e: FormattedEventWithMeta) {
match self.sender.try_send(e) {
Ok(()) => {
// notify the main thread
callback_set_my_latch();
}
Err(TrySendError::Disconnected(_)) => {}
Err(TrySendError::Full(_)) => {
// The queue is full, cannot send any more. To avoid blocking the tokio
// thread, simply drop the message. Better to lose some logs than get
// stuck if there's a problem with the logging.
//
// Record the fact that was a message was dropped by incrementing the
// counter.
DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
}
}
/// Simple formatter implementation for tracing_subscriber, which prints the log spans and
/// message part like the default formatter, but no timestamp or error level. The error
/// level is captured separately by `FormattedEventWithMeta', and when the error is
/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
/// point. (The timestamp printed will therefore lag behind the timestamp on the event
/// here, if the main thread doesn't process the log message promptly)
struct SimpleFormatter;
impl<S, N> FormatEvent<S, N> for SimpleFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result {
// Format all the spans in the event's span context.
if let Some(scope) = ctx.event_scope() {
for span in scope.from_root() {
write!(writer, "{}", span.name())?;
// `FormattedFields` is a formatted representation of the span's fields,
// which is stored in its extensions by the `fmt` layer's `new_span`
// method. The fields will have been formatted by the same field formatter
// that's provided to the event formatter in the `FmtContext`.
let ext = span.extensions();
let fields = &ext
.get::<FormattedFields<N>>()
.expect("will never be `None`");
// Skip formatting the fields if the span had no fields.
if !fields.is_empty() {
write!(writer, "{{{fields}}}")?;
}
write!(writer, ": ")?;
}
}
// Write fields on the event
ctx.field_format().format_fields(writer.by_ref(), event)?;
Ok(())
}
}

View File

@@ -0,0 +1,66 @@
use std::str::FromStr as _;
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
use measured::MetricGroup;
use measured::metric::MetricEncoding;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use utils::id::{TenantId, TimelineId};
pub struct CommunicatorWorkerProcessStruct {
runtime: tokio::runtime::Runtime,
/*** Metrics ***/
pub(crate) lfc_metrics: LfcMetricsCollector,
}
/// Launch the communicator process's Rust subsystems
pub(super) fn init(
tenant_id: Option<&str>,
timeline_id: Option<&str>,
) -> Result<&'static CommunicatorWorkerProcessStruct, String> {
// The caller validated these already
let _tenant_id = tenant_id
.map(TenantId::from_str)
.transpose()
.map_err(|e| format!("invalid tenant ID: {e}"))?;
let _timeline_id = timeline_id
.map(TimelineId::from_str)
.transpose()
.map_err(|e| format!("invalid timeline ID: {e}"))?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("communicator thread")
.build()
.unwrap();
let worker_struct = CommunicatorWorkerProcessStruct {
// Note: it's important to not drop the runtime, or all the tasks are dropped
// too. Including it in the returned struct is one way to keep it around.
runtime,
// metrics
lfc_metrics: LfcMetricsCollector,
};
let worker_struct = Box::leak(Box::new(worker_struct));
// Start the listener on the control socket
worker_struct
.runtime
.block_on(worker_struct.launch_control_socket_listener())
.map_err(|e| e.to_string())?;
Ok(worker_struct)
}
impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct
where
T: Encoding,
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
self.lfc_metrics.collect_group_into(enc)
}
}

View File

@@ -0,0 +1,13 @@
//! This code runs in the communicator worker process. This provides
//! the glue code to:
//!
//! - launch the main loop,
//! - receive IO requests from backends and process them,
//! - write results back to backends.
mod callbacks;
mod control_socket;
mod lfc_metrics;
mod logging;
mod main_loop;
mod worker_interface;

View File

@@ -0,0 +1,60 @@
//! Functions called from the C code in the worker process
use std::ffi::{CStr, CString, c_char};
use crate::worker_process::main_loop;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
/// Launch the communicator's tokio tasks, which do most of the work.
///
/// The caller has initialized the process as a regular PostgreSQL background worker
/// process.
///
/// Inputs:
/// `tenant_id` and `timeline_id` can be NULL, if we're been launched in "non-Neon" mode,
/// where we use local storage instead of connecting to remote neon storage. That's
/// currently only used in some unit tests.
///
/// Result:
/// Returns pointer to CommunicatorWorkerProcessStruct, which is a handle to running
/// Rust tasks. The C code can use it to interact with the Rust parts. On failure, returns
/// None/NULL, and an error message is returned in *error_p
///
/// This is called only once in the process, so the returned struct, and error message in
/// case of failure, are simply leaked.
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_launch(
tenant_id: *const c_char,
timeline_id: *const c_char,
error_p: *mut *const c_char,
) -> Option<&'static CommunicatorWorkerProcessStruct> {
// Convert the arguments into more convenient Rust types
let tenant_id = if tenant_id.is_null() {
None
} else {
let cstr = unsafe { CStr::from_ptr(tenant_id) };
Some(cstr.to_str().expect("assume UTF-8"))
};
let timeline_id = if timeline_id.is_null() {
None
} else {
let cstr = unsafe { CStr::from_ptr(timeline_id) };
Some(cstr.to_str().expect("assume UTF-8"))
};
// The `init` function does all the work.
let result = main_loop::init(tenant_id, timeline_id);
// On failure, return the error message to the C caller in *error_p.
match result {
Ok(worker_struct) => Some(worker_struct),
Err(errmsg) => {
let errmsg = CString::new(errmsg).expect("no nuls within error message");
let errmsg = Box::leak(errmsg.into_boxed_c_str());
let p: *const c_char = errmsg.as_ptr();
unsafe { *error_p = p };
None
}
}
}

View File

@@ -0,0 +1,273 @@
/*-------------------------------------------------------------------------
*
* communicator_process.c
* Functions for starting up the communicator background worker process.
*
* Currently, the communicator process only functions as a metrics
* exporter. It provides an HTTP endpoint for polling a limited set of
* metrics. TODO: In the future, it will do much more, i.e. handle all
* the communications with the pageservers.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/timestamp.h"
#include "communicator_process.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_perf_counters.h"
/* the rust bindings, generated by cbindgen */
#include "communicator/communicator_bindings.h"
static void pump_logging(struct LoggingReceiver *logging);
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
/**** Initialization functions. These run in postmaster ****/
void
pg_init_communicator_process(void)
{
BackgroundWorker bgw;
/* Initialize the background worker process */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
/**** Worker process functions. These run in the communicator worker process ****/
/*
* Entry point for the communicator bgworker process
*/
void
communicator_new_bgworker_main(Datum main_arg)
{
struct LoggingReceiver *logging;
const char *errmsg = NULL;
const struct CommunicatorWorkerProcessStruct *proc_handle;
/*
* Pretend that this process is a WAL sender. That affects the shutdown
* sequence: WAL senders are shut down last, after the final checkpoint
* has been written. That's what we want for the communicator process too.
*/
am_walsender = true;
MarkPostmasterChildWalSender();
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
/*
* Postmaster sends us SIGUSR2 when all regular backends and bgworkers
* have exited, and it's time for us to exit too
*/
pqsignal(SIGUSR2, die);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/*
* By default, INFO messages are not printed to the log. We want
* `tracing::info!` messages emitted from the communicator to be printed,
* however, so increase the log level.
*
* XXX: This overrides any user-set value from the config file. That's not
* great, but on the other hand, there should be little reason for user to
* control the verbosity of the communicator. It's not too verbose by
* default.
*/
SetConfigOption("log_min_messages", "INFO", PGC_SUSET, PGC_S_OVERRIDE);
logging = communicator_worker_configure_logging();
proc_handle = communicator_worker_launch(
neon_tenant[0] == '\0' ? NULL : neon_tenant,
neon_timeline[0] == '\0' ? NULL : neon_timeline,
&errmsg
);
if (proc_handle == NULL)
{
/*
* Something went wrong. Before exiting, forward any log messages that
* might've been generated during the failed launch.
*/
pump_logging(logging);
elog(PANIC, "%s", errmsg);
}
/*
* The Rust tokio runtime has been launched, and it's running in the
* background now. This loop in the main thread handles any interactions
* we need with the rest of PostgreSQL.
*
* NB: This process is now multi-threaded! The Rust threads do not call
* into any Postgres functions, but it's not entirely clear which Postgres
* functions are safe to call from this main thread either. Be very
* careful about adding anything non-trivial here.
*
* Also note that we try to react quickly to any log messages arriving
* from the Rust thread. Be careful to not do anything too expensive here
* that might cause delays.
*/
elog(LOG, "communicator threads started");
for (;;)
{
TimestampTz before;
long duration;
ResetLatch(MyLatch);
/*
* Forward any log messages from the Rust threads into the normal
* Postgres logging facility.
*/
pump_logging(logging);
/*
* Check interrupts like system shutdown or config reload
*
* We mustn't block for too long within this loop, or we risk the log
* queue to fill up and messages to be lost. Also, even if we can keep
* up, if there's a long delay between sending a message and printing
* it to the log, the timestamps on the messages get skewed, which is
* confusing.
*
* We expect processing interrupts to happen fast enough that it's OK,
* but measure it just in case, and print a warning if it takes longer
* than 100 ms.
*/
#define LOG_SKEW_WARNING_MS 100
before = GetCurrentTimestamp();
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());
if (duration > LOG_SKEW_WARNING_MS)
elog(WARNING, "handling interrupts took %ld ms, communicator log timestamps might be skewed", duration);
/*
* Wait until we are woken up. The rust threads will set the latch
* when there's a log message to forward.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
0,
PG_WAIT_EXTENSION);
}
}
static void
pump_logging(struct LoggingReceiver *logging)
{
char errbuf[1000];
int elevel;
int32 rc;
static uint64_t last_dropped_event_count = 0;
uint64_t dropped_event_count;
uint64_t dropped_now;
for (;;)
{
rc = communicator_worker_poll_logging(logging,
errbuf,
sizeof(errbuf),
&elevel,
&dropped_event_count);
if (rc == 0)
{
/* nothing to do */
break;
}
else if (rc == 1)
{
/* Because we don't want to exit on error */
if (message_level_is_interesting(elevel))
{
/*
* Prevent interrupts while cleaning up.
*
* (Not sure if this is required, but all the error handlers
* in Postgres that are installed as sigsetjmp() targets do
* this, so let's follow the example)
*/
HOLD_INTERRUPTS();
errstart(elevel, TEXTDOMAIN);
errmsg_internal("[COMMUNICATOR] %s", errbuf);
EmitErrorReport();
FlushErrorState();
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
}
else if (rc == -1)
{
elog(ERROR, "logging channel was closed unexpectedly");
}
}
/*
* If the queue was full at any time since the last time we reported it,
* report how many messages were lost. We do this outside the loop, so
* that if the logging system is clogged, we don't exacerbate it by
* printing lots of warnings about dropped messages.
*/
dropped_now = dropped_event_count - last_dropped_event_count;
if (dropped_now != 0)
{
elog(WARNING, "%lu communicator log messages were dropped because the log buffer was full",
(unsigned long) dropped_now);
last_dropped_event_count = dropped_event_count;
}
}
/****
* Callbacks from the rust code, in the communicator process.
*
* NOTE: These must be thread-safe! It's very limited which PostgreSQL
* functions you can use!!!
*
* The signatures of these need to match those in the Rust code.
*/
void
callback_set_my_latch_unsafe(void)
{
SetLatch(MyLatch);
}

View File

@@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* communicator_process.h
* Communicator process
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef COMMUNICATOR_PROCESS_H
#define COMMUNICATOR_PROCESS_H
extern void pg_init_communicator_process(void);
#endif /* COMMUNICATOR_PROCESS_H */

View File

@@ -52,6 +52,8 @@
#include "pagestore_client.h"
#include "communicator.h"
#include "communicator/communicator_bindings.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
@@ -2156,6 +2158,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
return dc;
}
/*
* Get metrics, for the built-in metrics exporter that's part of the communicator
* process.
*
* NB: This is called from a Rust tokio task inside the communicator process.
* Acquiring lwlocks, elog(), allocating memory or anything else non-trivial
* is strictly prohibited here!
*/
struct LfcMetrics
callback_get_lfc_metrics_unsafe(void)
{
struct LfcMetrics result = {
.lfc_cache_size_limit = (int64) lfc_size_limit * 1024 * 1024,
.lfc_hits = lfc_ctl ? lfc_ctl->hits : 0,
.lfc_misses = lfc_ctl ? lfc_ctl->misses : 0,
.lfc_used = lfc_ctl ? lfc_ctl->used : 0,
.lfc_writes = lfc_ctl ? lfc_ctl->writes : 0,
};
if (lfc_ctl)
{
for (int minutes = 1; minutes <= 60; minutes++)
{
result.lfc_approximate_working_set_size_windows[minutes - 1] =
lfc_approximate_working_set_size_seconds(minutes * 60, false);
}
}
return result;
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum

Some files were not shown because too many files have changed in this diff Show More