mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
20 Commits
conrad/try
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f5e118789 | ||
|
|
8bb45fd5da | ||
|
|
88bc06f148 | ||
|
|
d91d018afa | ||
|
|
9c0efba91e | ||
|
|
5464552020 | ||
|
|
80baeaa084 | ||
|
|
b7bc3ce61e | ||
|
|
050c9f704f | ||
|
|
0dbe551802 | ||
|
|
187170be47 | ||
|
|
30e1213141 | ||
|
|
25efbcc7f0 | ||
|
|
b2ecb10f91 | ||
|
|
5a48365fb9 | ||
|
|
194b9ffc41 | ||
|
|
1e30b31fa7 | ||
|
|
e181b996c3 | ||
|
|
1406bdc6a8 | ||
|
|
791b5d736b |
@@ -21,13 +21,14 @@ platforms = [
|
||||
# "x86_64-apple-darwin",
|
||||
# "x86_64-pc-windows-msvc",
|
||||
]
|
||||
|
||||
[final-excludes]
|
||||
workspace-members = [
|
||||
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
|
||||
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
|
||||
# from depending on workspace-hack because most of the dependencies are not used.
|
||||
"vm_monitor",
|
||||
# subzero-core is a stub crate that should be excluded from workspace-hack
|
||||
"subzero-core",
|
||||
# All of these exist in libs and are not usually built independently.
|
||||
# Putting workspace hack there adds a bottleneck for cargo builds.
|
||||
"compute_api",
|
||||
|
||||
28
.github/actions/prepare-for-subzero/action.yml
vendored
Normal file
28
.github/actions/prepare-for-subzero/action.yml
vendored
Normal file
@@ -0,0 +1,28 @@
|
||||
name: 'Prepare current job for subzero'
|
||||
description: >
|
||||
Set git token to access `neondatabase/subzero` from cargo build,
|
||||
and set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable to use git CLI
|
||||
|
||||
inputs:
|
||||
token:
|
||||
description: 'GitHub token with access to neondatabase/subzero'
|
||||
required: true
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
|
||||
steps:
|
||||
- name: Set git token for neondatabase/subzero
|
||||
uses: pyTooling/Actions/with-post-step@2307b526df64d55e95884e072e49aac2a00a9afa # v5.1.0
|
||||
env:
|
||||
SUBZERO_ACCESS_TOKEN: ${{ inputs.token }}
|
||||
with:
|
||||
main: |
|
||||
git config --global url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
|
||||
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a
|
||||
post: |
|
||||
git config --global --unset url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
|
||||
|
||||
- name: Set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: echo "CARGO_NET_GIT_FETCH_WITH_CLI=true" >> ${GITHUB_ENV}
|
||||
@@ -86,6 +86,10 @@ jobs:
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: ./.github/actions/prepare-for-subzero
|
||||
with:
|
||||
token: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
|
||||
- name: Set pg 14 revision for caching
|
||||
id: pg_v14_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
|
||||
@@ -116,7 +120,7 @@ jobs:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
SANITIZERS: ${{ inputs.sanitizers }}
|
||||
run: |
|
||||
CARGO_FLAGS="--locked --features testing"
|
||||
CARGO_FLAGS="--locked --features testing,rest_broker"
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
|
||||
CARGO_PROFILE=""
|
||||
|
||||
4
.github/workflows/_check-codestyle-rust.yml
vendored
4
.github/workflows/_check-codestyle-rust.yml
vendored
@@ -46,6 +46,10 @@ jobs:
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: ./.github/actions/prepare-for-subzero
|
||||
with:
|
||||
token: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
|
||||
- name: Cache cargo deps
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
|
||||
4
.github/workflows/build-macos.yml
vendored
4
.github/workflows/build-macos.yml
vendored
@@ -54,6 +54,10 @@ jobs:
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: ./.github/actions/prepare-for-subzero
|
||||
with:
|
||||
token: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
|
||||
- name: Install build dependencies
|
||||
run: |
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -632,6 +632,8 @@ jobs:
|
||||
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
|
||||
DEBIAN_VERSION=bookworm
|
||||
secrets: |
|
||||
SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
|
||||
1
.github/workflows/neon_extra_builds.yml
vendored
1
.github/workflows/neon_extra_builds.yml
vendored
@@ -72,6 +72,7 @@ jobs:
|
||||
check-macos-build:
|
||||
needs: [ check-permissions, files-changed ]
|
||||
uses: ./.github/workflows/build-macos.yml
|
||||
secrets: inherit
|
||||
with:
|
||||
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
|
||||
rebuild_rust_code: ${{ fromJSON(needs.files-changed.outputs.rebuild_rust_code) }}
|
||||
|
||||
52
.github/workflows/pgindent.yml
vendored
Normal file
52
.github/workflows/pgindent.yml
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
name: pgindent Neon
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- release
|
||||
paths:
|
||||
- 'pgxn/**.[ch]'
|
||||
- '.github/workflows/pgindent.yml'
|
||||
pull_request:
|
||||
paths:
|
||||
- 'pgxn/**.[ch]'
|
||||
- '.github/workflows/pgindent.yml'
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
|
||||
|
||||
jobs:
|
||||
pgindent:
|
||||
runs-on: ubuntu-24.04
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Set pg 17 revision for caching
|
||||
id: pg_v17_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_17
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-release-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Run pgindent
|
||||
run: |
|
||||
make -s -j neon-pgindent-check
|
||||
|
||||
- name: How to fix
|
||||
if: ${{ failure() }}
|
||||
run: |
|
||||
echo Run \"make neon-pgindent\" in the event of a failure
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -26,9 +26,14 @@ docker-compose/docker-compose-parallel.yml
|
||||
*.o
|
||||
*.so
|
||||
*.Po
|
||||
*.pid
|
||||
|
||||
# pgindent typedef lists
|
||||
*.list
|
||||
|
||||
# Node
|
||||
**/node_modules/
|
||||
|
||||
# various files for local testing
|
||||
/proxy/.subzero
|
||||
local_proxy.json
|
||||
|
||||
@@ -19,6 +19,7 @@ ln -s ../../pre-commit.py .git/hooks/pre-commit
|
||||
```
|
||||
|
||||
This will run following checks on staged files before each commit:
|
||||
- `pgindent` over any Neon Postgres extension files
|
||||
- `rustfmt`
|
||||
- checks for Python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
|
||||
|
||||
|
||||
220
Cargo.lock
generated
220
Cargo.lock
generated
@@ -52,6 +52,12 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aliasable"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.6.1"
|
||||
@@ -490,7 +496,7 @@ dependencies = [
|
||||
"hex",
|
||||
"hmac",
|
||||
"http 0.2.9",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"once_cell",
|
||||
"p256 0.11.1",
|
||||
"percent-encoding",
|
||||
@@ -631,7 +637,7 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
"bytes",
|
||||
"http 0.2.9",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -649,7 +655,7 @@ dependencies = [
|
||||
"bytes-utils",
|
||||
"futures-core",
|
||||
"http 0.2.9",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 0.4.5",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
@@ -698,7 +704,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
@@ -732,7 +738,7 @@ checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
@@ -756,7 +762,7 @@ dependencies = [
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
@@ -1090,7 +1096,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "975982cdb7ad6a142be15bdf84aea7ec6a9e5d4d797c004d43185b24cfe4e684"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"indexmap 2.9.0",
|
||||
"log",
|
||||
"proc-macro2",
|
||||
@@ -1228,7 +1234,7 @@ version = "4.5.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -1290,8 +1296,14 @@ dependencies = [
|
||||
name = "communicator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"cbindgen",
|
||||
"neon-shmem",
|
||||
"http 1.3.1",
|
||||
"measured",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -1334,7 +1346,10 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"hostname-validator",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
@@ -1357,6 +1372,7 @@ dependencies = [
|
||||
"ring",
|
||||
"rlimit",
|
||||
"rust-ini",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
@@ -1445,7 +1461,7 @@ name = "consumption_metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -1848,7 +1864,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"hex",
|
||||
"parking_lot 0.12.1",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"smallvec",
|
||||
"tracing",
|
||||
"utils",
|
||||
@@ -1969,7 +1985,7 @@ checksum = "0892a17df262a24294c382f0d5997571006e7a4348b4327557c4ff1cd4a8bccc"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"either",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -2093,7 +2109,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -2661,7 +2677,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"indexmap 2.9.0",
|
||||
"slab",
|
||||
"tokio",
|
||||
@@ -2743,7 +2759,7 @@ dependencies = [
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"headers-core",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"httpdate",
|
||||
"mime",
|
||||
"sha1",
|
||||
@@ -2755,9 +2771,15 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.5.0"
|
||||
@@ -2833,9 +2855,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.1.0"
|
||||
version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
|
||||
checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -2860,7 +2882,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2871,7 +2893,7 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"pin-project-lite",
|
||||
]
|
||||
@@ -2995,7 +3017,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"httparse",
|
||||
"httpdate",
|
||||
@@ -3028,7 +3050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"rustls 0.22.4",
|
||||
@@ -3060,7 +3082,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"hyper 1.4.1",
|
||||
"pin-project-lite",
|
||||
@@ -3709,7 +3731,7 @@ version = "0.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -3770,8 +3792,8 @@ dependencies = [
|
||||
"once_cell",
|
||||
"procfs",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"rand_distr 0.4.3",
|
||||
"rand 0.9.1",
|
||||
"rand_distr",
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
@@ -3863,7 +3885,7 @@ dependencies = [
|
||||
"lock_api",
|
||||
"nix 0.30.1",
|
||||
"rand 0.9.1",
|
||||
"rand_distr 0.5.1",
|
||||
"rand_distr",
|
||||
"rustc-hash 2.1.1",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
@@ -4160,7 +4182,7 @@ checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
]
|
||||
@@ -4173,7 +4195,7 @@ checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
@@ -4252,6 +4274,30 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ouroboros"
|
||||
version = "0.18.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e0f050db9c44b97a94723127e6be766ac5c340c48f2c4bb3ffa11713744be59"
|
||||
dependencies = [
|
||||
"aliasable",
|
||||
"ouroboros_macro",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ouroboros_macro"
|
||||
version = "0.18.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c7028bdd3d43083f6d8d4d5187680d0d3560d54df4cc9d752005268b41e64d0"
|
||||
dependencies = [
|
||||
"heck 0.4.1",
|
||||
"proc-macro2",
|
||||
"proc-macro2-diagnostics",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "outref"
|
||||
version = "0.5.1"
|
||||
@@ -4315,7 +4361,7 @@ dependencies = [
|
||||
"pageserver_client_grpc",
|
||||
"pageserver_page_api",
|
||||
"pprof",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -4381,7 +4427,7 @@ dependencies = [
|
||||
"hashlink",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
@@ -4412,7 +4458,7 @@ dependencies = [
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"procfs",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"range-set-blaze",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
@@ -4479,7 +4525,7 @@ dependencies = [
|
||||
"postgres_ffi_types",
|
||||
"postgres_versioninfo",
|
||||
"posthog_client_lite",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -4549,7 +4595,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"svg_fmt",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -4922,7 +4968,7 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"hmac",
|
||||
"memchr",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
"tokio",
|
||||
@@ -5114,7 +5160,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"itertools 0.10.5",
|
||||
"postgres-protocol",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"serde",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
@@ -5148,6 +5194,19 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2-diagnostics"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
"version_check",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "procfs"
|
||||
version = "0.16.0"
|
||||
@@ -5217,7 +5276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"multimap",
|
||||
@@ -5238,7 +5297,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"multimap",
|
||||
@@ -5334,7 +5393,7 @@ dependencies = [
|
||||
"hex",
|
||||
"hmac",
|
||||
"hostname",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
@@ -5354,6 +5413,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"ouroboros",
|
||||
"p256 0.13.2",
|
||||
"papaya",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -5364,8 +5424,9 @@ dependencies = [
|
||||
"postgres-protocol2",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"rand_distr 0.4.3",
|
||||
"rand 0.9.1",
|
||||
"rand_core 0.6.4",
|
||||
"rand_distr",
|
||||
"rcgen",
|
||||
"redis",
|
||||
"regex",
|
||||
@@ -5390,6 +5451,7 @@ dependencies = [
|
||||
"socket2",
|
||||
"strum_macros",
|
||||
"subtle",
|
||||
"subzero-core",
|
||||
"thiserror 1.0.69",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemallocator",
|
||||
@@ -5566,16 +5628,6 @@ dependencies = [
|
||||
"getrandom 0.3.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.5.1"
|
||||
@@ -5705,14 +5757,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.2"
|
||||
version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
|
||||
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"regex-automata 0.4.9",
|
||||
"regex-syntax 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5726,13 +5778,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.3"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
|
||||
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax 0.8.2",
|
||||
"regex-syntax 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5749,9 +5801,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.2"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "relative-path"
|
||||
@@ -5789,7 +5841,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"reqwest",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
@@ -5821,7 +5873,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
@@ -5863,7 +5915,7 @@ checksum = "d1ccd3b55e711f91a9885a2fa6fbbb2e39db1776420b062efc058c6410f7e5e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"thiserror 1.0.69",
|
||||
@@ -5880,7 +5932,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"futures",
|
||||
"getrandom 0.2.11",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"hyper 1.4.1",
|
||||
"parking_lot 0.11.2",
|
||||
"reqwest",
|
||||
@@ -5901,7 +5953,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"getrandom 0.2.11",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"matchit",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
@@ -6260,7 +6312,7 @@ dependencies = [
|
||||
"fail",
|
||||
"futures",
|
||||
"hex",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
@@ -6279,7 +6331,7 @@ dependencies = [
|
||||
"postgres_versioninfo",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
@@ -6973,7 +7025,7 @@ dependencies = [
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"posthog_client_lite",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
@@ -7109,7 +7161,7 @@ version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
@@ -7122,6 +7174,10 @@ version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
|
||||
|
||||
[[package]]
|
||||
name = "subzero-core"
|
||||
version = "3.0.1"
|
||||
|
||||
[[package]]
|
||||
name = "svg_fmt"
|
||||
version = "0.4.3"
|
||||
@@ -7732,7 +7788,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"percent-encoding",
|
||||
@@ -7756,7 +7812,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"flate2",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
@@ -7847,7 +7903,7 @@ dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bitflags 2.8.0",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
@@ -7868,7 +7924,7 @@ name = "tower-otel"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"opentelemetry",
|
||||
"pin-project",
|
||||
"tower-layer",
|
||||
@@ -8049,7 +8105,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"data-encoding",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
@@ -8068,7 +8124,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"data-encoding",
|
||||
"http 1.1.0",
|
||||
"http 1.3.1",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
@@ -8250,7 +8306,7 @@ dependencies = [
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"regex",
|
||||
"scopeguard",
|
||||
"sentry",
|
||||
@@ -8857,8 +8913,8 @@ dependencies = [
|
||||
"quote",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"regex-automata 0.4.9",
|
||||
"regex-syntax 0.8.5",
|
||||
"reqwest",
|
||||
"rustls 0.23.27",
|
||||
"rustls-pki-types",
|
||||
@@ -8954,6 +9010,12 @@ version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd"
|
||||
|
||||
[[package]]
|
||||
name = "yansi"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
|
||||
|
||||
[[package]]
|
||||
name = "yasna"
|
||||
version = "0.5.2"
|
||||
|
||||
@@ -49,6 +49,7 @@ members = [
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"endpoint_storage",
|
||||
"pgxn/neon/communicator",
|
||||
"proxy/subzero_core",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -157,7 +158,9 @@ procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13.5"
|
||||
prost-types = "0.13.5"
|
||||
rand = "0.8"
|
||||
rand = "0.9"
|
||||
# Remove after p256 is updated to 0.14.
|
||||
rand_core = "=0.6"
|
||||
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
regex = "1.10.2"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
|
||||
|
||||
26
Dockerfile
26
Dockerfile
@@ -63,7 +63,14 @@ WORKDIR /home/nonroot
|
||||
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
|
||||
set -e \
|
||||
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
|
||||
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
|
||||
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero" && \
|
||||
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a; \
|
||||
fi \
|
||||
&& cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
# Main build image
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS build
|
||||
@@ -71,20 +78,33 @@ WORKDIR /home/nonroot
|
||||
ARG GIT_VERSION=local
|
||||
ARG BUILD_TAG
|
||||
ARG ADDITIONAL_RUSTFLAGS=""
|
||||
ENV CARGO_FEATURES="default"
|
||||
|
||||
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
|
||||
# `recipe.json`, so the layer can be reused as long as none of the dependencies change.
|
||||
COPY --from=plan /home/nonroot/recipe.json recipe.json
|
||||
RUN set -e \
|
||||
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
|
||||
set -e \
|
||||
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
|
||||
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
|
||||
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"; \
|
||||
fi \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
|
||||
|
||||
# Perform the main build. We reuse the Postgres build artifacts from the intermediate 'pg-build'
|
||||
# layer, and the cargo dependencies built in the previous step.
|
||||
COPY --chown=nonroot --from=pg-build /home/nonroot/pg_install/ pg_install
|
||||
COPY --chown=nonroot . .
|
||||
COPY --chown=nonroot --from=plan /home/nonroot/proxy/Cargo.toml proxy/Cargo.toml
|
||||
COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Cargo.lock
|
||||
|
||||
RUN set -e \
|
||||
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
|
||||
set -e \
|
||||
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
|
||||
export CARGO_FEATURES="rest_broker"; \
|
||||
fi \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
|
||||
--features $CARGO_FEATURES \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pagectl \
|
||||
|
||||
4
Makefile
4
Makefile
@@ -215,6 +215,10 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
|
||||
-C $(BUILD_DIR)/pgxn-v17/neon \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent
|
||||
|
||||
# Check whether pxgn/neon code is compliant with pgindent.
|
||||
.PHONY: pgindent
|
||||
neon-pgindent-check:
|
||||
$(MAKE) PGINDENT_FLAGS=--silent-diff neon-pgindent
|
||||
|
||||
.PHONY: setup-pre-commit-hook
|
||||
setup-pre-commit-hook:
|
||||
|
||||
@@ -27,7 +27,10 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
http-body-util.workspace = true
|
||||
hostname-validator = "1.1"
|
||||
hyper.workspace = true
|
||||
hyper-util.workspace = true
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
@@ -44,6 +47,7 @@ postgres.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
ring = "0.17"
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
98
compute_tools/src/communicator_socket_client.rs
Normal file
98
compute_tools/src/communicator_socket_client.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
//! Client for making request to a running Postgres server's communicator control socket.
|
||||
//!
|
||||
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
|
||||
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use hyper::client::conn::http1::SendRequest;
|
||||
use hyper_util::rt::TokioIo;
|
||||
|
||||
/// Name of the socket within the Postgres data directory. This better match that in
|
||||
/// `pgxn/neon/communicator/src/lib.rs`.
|
||||
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
|
||||
|
||||
/// Open a connection to the communicator's control socket, prepare to send requests to it
|
||||
/// with hyper.
|
||||
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
|
||||
where
|
||||
B: hyper::body::Body + 'static + Send,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
{
|
||||
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
|
||||
let socket_path_len = socket_path.display().to_string().len();
|
||||
|
||||
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
|
||||
// Unix Domain socket. The limit is on the connect(2) function used to open the
|
||||
// socket, not on the absolute path itself. Postgres changes the current directory to
|
||||
// the data directory and uses a relative path to bind to the socket, and the relative
|
||||
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
|
||||
// open the socket, we need to use a full path, which can be arbitrarily long.
|
||||
//
|
||||
// There are a few ways we could work around this:
|
||||
//
|
||||
// 1. Change the current directory to the Postgres data directory and use a relative
|
||||
// path in the connect(2) call. That's problematic because the current directory
|
||||
// applies to the whole process. We could change the current directory early in
|
||||
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
|
||||
// it would be more robust if the data directory is moved around or unlinked for
|
||||
// some reason, and you would be less likely to accidentally litter other parts of
|
||||
// the filesystem with e.g. temporary files. However, that's a pretty invasive
|
||||
// change.
|
||||
//
|
||||
// 2. On Linux, you could open() the data directory, and refer to the the socket
|
||||
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
|
||||
// Linux-only.
|
||||
//
|
||||
// 3. Create a symbolic link to the socket with a shorter path, and use that.
|
||||
//
|
||||
// We use the symbolic link approach here. Hopefully the paths we use in production
|
||||
// are shorter, so that we can open the socket directly, so that this hack is needed
|
||||
// only in development.
|
||||
let connect_result = if socket_path_len < 100 {
|
||||
// We can open the path directly with no hacks.
|
||||
tokio::net::UnixStream::connect(socket_path).await
|
||||
} else {
|
||||
// The path to the socket is too long. Create a symlink to it with a shorter path.
|
||||
let short_path = std::env::temp_dir().join(format!(
|
||||
"compute_ctl.short-socket.{}.{}",
|
||||
std::process::id(),
|
||||
tokio::task::id()
|
||||
));
|
||||
std::os::unix::fs::symlink(&socket_path, &short_path)?;
|
||||
|
||||
// Delete the symlink as soon as we have connected to it. There's a small chance
|
||||
// of leaking if the process dies before we remove it, so try to keep that window
|
||||
// as small as possible.
|
||||
scopeguard::defer! {
|
||||
if let Err(err) = std::fs::remove_file(&short_path) {
|
||||
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
|
||||
short_path.display(), err);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"created symlink \"{}\" for socket \"{}\", opening it now",
|
||||
short_path.display(),
|
||||
socket_path.display()
|
||||
);
|
||||
|
||||
tokio::net::UnixStream::connect(&short_path).await
|
||||
};
|
||||
|
||||
let stream = connect_result.context("connecting to communicator control socket")?;
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
|
||||
|
||||
// spawn a task to poll the connection and drive the HTTP state
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = connection.await {
|
||||
eprintln!("Error in connection: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(request_sender)
|
||||
}
|
||||
@@ -1,10 +1,18 @@
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::response::Response;
|
||||
use http::StatusCode;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Request, StatusCode};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
|
||||
use crate::communicator_socket_client::connect_communicator_socket;
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::http::JsonResponse;
|
||||
use crate::metrics::collect;
|
||||
|
||||
@@ -31,3 +39,42 @@ pub(in crate::http) async fn get_metrics() -> Response {
|
||||
.body(Body::from(buffer))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Fetch and forward metrics from the Postgres neon extension's metrics
|
||||
/// exporter that are used by autoscaling-agent.
|
||||
///
|
||||
/// The neon extension exposes these metrics over a Unix domain socket
|
||||
/// in the data directory. That's not accessible directly from the outside
|
||||
/// world, so we have this endpoint in compute_ctl to expose it
|
||||
pub(in crate::http) async fn get_autoscaling_metrics(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Result<Response, Response> {
|
||||
let pgdata = Path::new(&compute.params.pgdata);
|
||||
|
||||
// Connect to the communicator process's metrics socket
|
||||
let mut metrics_client = connect_communicator_socket(pgdata)
|
||||
.await
|
||||
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
|
||||
|
||||
// Make a request for /autoscaling_metrics
|
||||
let request = Request::builder()
|
||||
.method("GET")
|
||||
.uri("/autoscaling_metrics")
|
||||
.header("Host", "localhost") // hyper requires Host, even though the server won't care
|
||||
.body(Body::from(""))
|
||||
.unwrap();
|
||||
let resp = metrics_client
|
||||
.send_request(request)
|
||||
.await
|
||||
.context("fetching metrics from Postgres metrics service")
|
||||
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
|
||||
|
||||
// Build a response that just forwards the response we got.
|
||||
let mut response = Response::builder();
|
||||
response = response.status(resp.status());
|
||||
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
|
||||
response = response.header(CONTENT_TYPE, content_type);
|
||||
}
|
||||
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
|
||||
Ok(response.body(body).unwrap())
|
||||
}
|
||||
|
||||
@@ -81,8 +81,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
Server::External {
|
||||
config, compute_id, ..
|
||||
} => {
|
||||
let unauthenticated_router =
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route(
|
||||
"/autoscaling_metrics",
|
||||
get(metrics::get_autoscaling_metrics),
|
||||
);
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod checker;
|
||||
pub mod communicator_socket_client;
|
||||
pub mod config;
|
||||
pub mod configurator;
|
||||
pub mod http;
|
||||
|
||||
@@ -411,7 +411,8 @@ impl ComputeNode {
|
||||
.map(|limit| match limit {
|
||||
0..10 => limit,
|
||||
10..30 => 10,
|
||||
30.. => limit / 3,
|
||||
30..300 => limit / 3,
|
||||
300.. => 100,
|
||||
})
|
||||
// If we didn't find max_connections, default to 10 concurrent connections.
|
||||
.unwrap_or(10)
|
||||
|
||||
@@ -8,10 +8,10 @@ code changes locally, but not suitable for running production systems.
|
||||
|
||||
## Example: Start with Postgres 16
|
||||
|
||||
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 3 of the start-up commands.
|
||||
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 2 of the start-up commands.
|
||||
|
||||
```shell
|
||||
cargo neon init --pg-version 16
|
||||
cargo neon init
|
||||
cargo neon start
|
||||
cargo neon tenant create --set-default --pg-version 16
|
||||
cargo neon endpoint create main --pg-version 16
|
||||
|
||||
@@ -407,6 +407,12 @@ struct StorageControllerStartCmdArgs {
|
||||
help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
|
||||
)]
|
||||
base_port: Option<u16>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "Whether the storage controller should handle pageserver-reported local disk loss events."
|
||||
)]
|
||||
handle_ps_local_disk_loss: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
@@ -1809,6 +1815,7 @@ async fn handle_storage_controller(
|
||||
instance_id: args.instance_id,
|
||||
base_port: args.base_port,
|
||||
start_timeout: args.start_timeout,
|
||||
handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
|
||||
};
|
||||
|
||||
if let Err(e) = svc.start(start_args).await {
|
||||
|
||||
@@ -65,7 +65,6 @@ use jsonwebtoken::jwk::{
|
||||
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
|
||||
};
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pem::Pem;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use safekeeper_api::PgMajorVersion;
|
||||
@@ -77,6 +76,7 @@ use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
|
||||
use tracing::debug;
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::shard::ShardStripeSize;
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
@@ -56,6 +56,7 @@ pub struct NeonStorageControllerStartArgs {
|
||||
pub instance_id: u8,
|
||||
pub base_port: Option<u16>,
|
||||
pub start_timeout: humantime::Duration,
|
||||
pub handle_ps_local_disk_loss: Option<bool>,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerStartArgs {
|
||||
@@ -64,6 +65,7 @@ impl NeonStorageControllerStartArgs {
|
||||
instance_id: 1,
|
||||
base_port: None,
|
||||
start_timeout,
|
||||
handle_ps_local_disk_loss: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -669,6 +671,10 @@ impl StorageController {
|
||||
|
||||
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
|
||||
|
||||
if start_args.handle_ps_local_disk_loss.unwrap_or_default() {
|
||||
args.push("--handle-ps-local-disk-loss".to_string());
|
||||
}
|
||||
|
||||
background_process::start_process(
|
||||
COMMAND,
|
||||
&instance_dir,
|
||||
|
||||
@@ -35,6 +35,7 @@ reason = "The paste crate is a build-only dependency with no runtime components.
|
||||
# More documentation for the licenses section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
|
||||
[licenses]
|
||||
version = 2
|
||||
allow = [
|
||||
"0BSD",
|
||||
"Apache-2.0",
|
||||
|
||||
@@ -233,7 +233,7 @@ mod tests {
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
use rand::Rng;
|
||||
let random = rand::thread_rng().r#gen::<u32>();
|
||||
let random = rand::rng().random::<u32>();
|
||||
|
||||
let s3_config = remote_storage::S3Config {
|
||||
bucket_name: var(REAL_S3_BUCKET).unwrap(),
|
||||
|
||||
@@ -90,7 +90,7 @@ impl<'a> IdempotencyKey<'a> {
|
||||
IdempotencyKey {
|
||||
now: Utc::now(),
|
||||
node_id,
|
||||
nonce: rand::thread_rng().gen_range(0..=9999),
|
||||
nonce: rand::rng().random_range(0..=9999),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ impl NodeOs {
|
||||
|
||||
/// Generate a random number in range [0, max).
|
||||
pub fn random(&self, max: u64) -> u64 {
|
||||
self.internal.rng.lock().gen_range(0..max)
|
||||
self.internal.rng.lock().random_range(0..max)
|
||||
}
|
||||
|
||||
/// Append a new event to the world event log.
|
||||
|
||||
@@ -32,10 +32,10 @@ impl Delay {
|
||||
/// Generate a random delay in range [min, max]. Return None if the
|
||||
/// message should be dropped.
|
||||
pub fn delay(&self, rng: &mut StdRng) -> Option<u64> {
|
||||
if rng.gen_bool(self.fail_prob) {
|
||||
if rng.random_bool(self.fail_prob) {
|
||||
return None;
|
||||
}
|
||||
Some(rng.gen_range(self.min..=self.max))
|
||||
Some(rng.random_range(self.min..=self.max))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ impl World {
|
||||
/// Create a new random number generator.
|
||||
pub fn new_rng(&self) -> StdRng {
|
||||
let mut rng = self.rng.lock();
|
||||
StdRng::from_rng(rng.deref_mut()).unwrap()
|
||||
StdRng::from_rng(rng.deref_mut())
|
||||
}
|
||||
|
||||
/// Create a new node.
|
||||
|
||||
@@ -17,5 +17,5 @@ procfs.workspace = true
|
||||
measured-process.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8"
|
||||
rand_distr = "0.4.3"
|
||||
rand.workspace = true
|
||||
rand_distr = "0.5"
|
||||
|
||||
@@ -260,7 +260,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_small() {
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(100.0, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [46, 30, 32]);
|
||||
assert!(51.3 < estimate[0] && estimate[0] < 51.4);
|
||||
@@ -270,7 +270,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_medium() {
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000.0, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [2529, 1618, 1629]);
|
||||
assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
|
||||
@@ -280,7 +280,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_large() {
|
||||
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
|
||||
let (actual, estimate) =
|
||||
test_cardinality(1_000_000, Zipf::new(1_000_000.0, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [129077, 79579, 79630]);
|
||||
assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
|
||||
@@ -290,7 +291,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_small2() {
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(200.0, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [92, 58, 60]);
|
||||
assert!(116.1 < estimate[0] && estimate[0] < 116.2);
|
||||
@@ -300,7 +301,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_medium2() {
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000.0, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [8201, 5131, 5051]);
|
||||
assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
|
||||
@@ -310,7 +311,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_large2() {
|
||||
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
|
||||
let (actual, estimate) =
|
||||
test_cardinality(1_000_000, Zipf::new(2_000_000.0, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [777847, 482069, 482246]);
|
||||
assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
|
||||
|
||||
@@ -16,5 +16,5 @@ rustc-hash.workspace = true
|
||||
tempfile = "3.14.0"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.9"
|
||||
rand.workspace = true
|
||||
rand_distr = "0.5.1"
|
||||
|
||||
@@ -596,6 +596,7 @@ pub struct TimelineImportRequest {
|
||||
pub timeline_id: TimelineId,
|
||||
pub start_lsn: Lsn,
|
||||
pub sk_set: Vec<NodeId>,
|
||||
pub force_upsert: bool,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone)]
|
||||
|
||||
@@ -981,12 +981,12 @@ mod tests {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
|
||||
let key = Key {
|
||||
field1: rng.r#gen(),
|
||||
field2: rng.r#gen(),
|
||||
field3: rng.r#gen(),
|
||||
field4: rng.r#gen(),
|
||||
field5: rng.r#gen(),
|
||||
field6: rng.r#gen(),
|
||||
field1: rng.random(),
|
||||
field2: rng.random(),
|
||||
field3: rng.random(),
|
||||
field4: rng.random(),
|
||||
field5: rng.random(),
|
||||
field6: rng.random(),
|
||||
};
|
||||
|
||||
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
|
||||
|
||||
@@ -443,9 +443,9 @@ pub struct ImportPgdataIdempotencyKey(pub String);
|
||||
impl ImportPgdataIdempotencyKey {
|
||||
pub fn random() -> Self {
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::distr::Alphanumeric;
|
||||
Self(
|
||||
rand::thread_rng()
|
||||
rand::rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(20)
|
||||
.map(char::from)
|
||||
|
||||
@@ -69,22 +69,6 @@ impl Hash for ShardIdentity {
|
||||
}
|
||||
}
|
||||
|
||||
/// Stripe size in number of pages
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardStripeSize(pub u32);
|
||||
|
||||
impl Default for ShardStripeSize {
|
||||
fn default() -> Self {
|
||||
DEFAULT_STRIPE_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardStripeSize {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Layout version: for future upgrades where we might change how the key->shard mapping works
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
|
||||
pub struct ShardLayout(u8);
|
||||
|
||||
@@ -21,6 +21,14 @@ pub struct ReAttachRequest {
|
||||
/// if the node already has a node_id set.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub register: Option<NodeRegisterRequest>,
|
||||
|
||||
/// Hadron: Optional flag to indicate whether the node is starting with an empty local disk.
|
||||
/// Will be set to true if the node couldn't find any local tenant data on startup, could be
|
||||
/// due to the node starting for the first time or due to a local SSD failure/disk wipe event.
|
||||
/// The flag may be used by the storage controller to update its observed state of the world
|
||||
/// to make sure that it sends explicit location_config calls to the node following the
|
||||
/// re-attach request.
|
||||
pub empty_local_disk: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
@@ -203,12 +203,12 @@ impl fmt::Display for CancelKeyData {
|
||||
}
|
||||
}
|
||||
|
||||
use rand::distributions::{Distribution, Standard};
|
||||
impl Distribution<CancelKeyData> for Standard {
|
||||
use rand::distr::{Distribution, StandardUniform};
|
||||
impl Distribution<CancelKeyData> for StandardUniform {
|
||||
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
|
||||
CancelKeyData {
|
||||
backend_pid: rng.r#gen(),
|
||||
cancel_key: rng.r#gen(),
|
||||
backend_pid: rng.random(),
|
||||
cancel_key: rng.random(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,10 +155,10 @@ pub struct ScramSha256 {
|
||||
|
||||
fn nonce() -> String {
|
||||
// rand 0.5's ThreadRng is cryptographically secure
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
(0..NONCE_LENGTH)
|
||||
.map(|_| {
|
||||
let mut v = rng.gen_range(0x21u8..0x7e);
|
||||
let mut v = rng.random_range(0x21u8..0x7e);
|
||||
if v == 0x2c {
|
||||
v = 0x7e
|
||||
}
|
||||
|
||||
@@ -74,7 +74,6 @@ impl Header {
|
||||
}
|
||||
|
||||
/// An enum representing Postgres backend messages.
|
||||
#[non_exhaustive]
|
||||
pub enum Message {
|
||||
AuthenticationCleartextPassword,
|
||||
AuthenticationGss,
|
||||
@@ -145,16 +144,7 @@ impl Message {
|
||||
PARSE_COMPLETE_TAG => Message::ParseComplete,
|
||||
BIND_COMPLETE_TAG => Message::BindComplete,
|
||||
CLOSE_COMPLETE_TAG => Message::CloseComplete,
|
||||
NOTIFICATION_RESPONSE_TAG => {
|
||||
let process_id = buf.read_i32::<BigEndian>()?;
|
||||
let channel = buf.read_cstr()?;
|
||||
let message = buf.read_cstr()?;
|
||||
Message::NotificationResponse(NotificationResponseBody {
|
||||
process_id,
|
||||
channel,
|
||||
message,
|
||||
})
|
||||
}
|
||||
NOTIFICATION_RESPONSE_TAG => Message::NotificationResponse(NotificationResponseBody {}),
|
||||
COPY_DONE_TAG => Message::CopyDone,
|
||||
COMMAND_COMPLETE_TAG => {
|
||||
let tag = buf.read_cstr()?;
|
||||
@@ -543,28 +533,7 @@ impl NoticeResponseBody {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NotificationResponseBody {
|
||||
process_id: i32,
|
||||
channel: Bytes,
|
||||
message: Bytes,
|
||||
}
|
||||
|
||||
impl NotificationResponseBody {
|
||||
#[inline]
|
||||
pub fn process_id(&self) -> i32 {
|
||||
self.process_id
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn channel(&self) -> io::Result<&str> {
|
||||
get_str(&self.channel)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn message(&self) -> io::Result<&str> {
|
||||
get_str(&self.message)
|
||||
}
|
||||
}
|
||||
pub struct NotificationResponseBody {}
|
||||
|
||||
pub struct ParameterDescriptionBody {
|
||||
storage: Bytes,
|
||||
|
||||
@@ -28,7 +28,7 @@ const SCRAM_DEFAULT_SALT_LEN: usize = 16;
|
||||
/// special characters that would require escaping in an SQL command.
|
||||
pub async fn scram_sha_256(password: &[u8]) -> String {
|
||||
let mut salt: [u8; SCRAM_DEFAULT_SALT_LEN] = [0; SCRAM_DEFAULT_SALT_LEN];
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
rng.fill_bytes(&mut salt);
|
||||
scram_sha_256_salt(password, salt).await
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::cancel_token::RawCancelToken;
|
||||
use crate::codec::{BackendMessages, FrontendMessage};
|
||||
use crate::codec::{BackendMessages, FrontendMessage, RecordNotices};
|
||||
use crate::config::{Host, SslMode};
|
||||
use crate::query::RowStream;
|
||||
use crate::simple_query::SimpleQueryStream;
|
||||
@@ -221,6 +221,18 @@ impl Client {
|
||||
&mut self.inner
|
||||
}
|
||||
|
||||
pub fn record_notices(&mut self, limit: usize) -> mpsc::UnboundedReceiver<Box<str>> {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let notices = RecordNotices { sender: tx, limit };
|
||||
self.inner
|
||||
.sender
|
||||
.send(FrontendMessage::RecordNotices(notices))
|
||||
.ok();
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
|
||||
/// to save a roundtrip
|
||||
pub async fn query_raw_txt<S, I>(
|
||||
|
||||
@@ -3,10 +3,17 @@ use std::io;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use postgres_protocol2::message::backend;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
pub enum FrontendMessage {
|
||||
Raw(Bytes),
|
||||
RecordNotices(RecordNotices),
|
||||
}
|
||||
|
||||
pub struct RecordNotices {
|
||||
pub sender: UnboundedSender<Box<str>>,
|
||||
pub limit: usize,
|
||||
}
|
||||
|
||||
pub enum BackendMessage {
|
||||
@@ -33,14 +40,11 @@ impl FallibleIterator for BackendMessages {
|
||||
|
||||
pub struct PostgresCodec;
|
||||
|
||||
impl Encoder<FrontendMessage> for PostgresCodec {
|
||||
impl Encoder<Bytes> for PostgresCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
|
||||
match item {
|
||||
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
|
||||
}
|
||||
|
||||
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> io::Result<()> {
|
||||
dst.extend_from_slice(&item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
use std::net::IpAddr;
|
||||
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::client::SocketConfig;
|
||||
use crate::codec::BackendMessage;
|
||||
use crate::config::Host;
|
||||
use crate::connect_raw::connect_raw;
|
||||
use crate::connect_socket::connect_socket;
|
||||
@@ -48,8 +46,8 @@ where
|
||||
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
|
||||
let RawConnection {
|
||||
stream,
|
||||
parameters,
|
||||
delayed_notice,
|
||||
parameters: _,
|
||||
delayed_notice: _,
|
||||
process_id,
|
||||
secret_key,
|
||||
} = connect_raw(stream, config).await?;
|
||||
@@ -72,13 +70,7 @@ where
|
||||
secret_key,
|
||||
);
|
||||
|
||||
// delayed notices are always sent as "Async" messages.
|
||||
let delayed = delayed_notice
|
||||
.into_iter()
|
||||
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
|
||||
.collect();
|
||||
|
||||
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
|
||||
let connection = Connection::new(stream, conn_tx, conn_rx);
|
||||
|
||||
Ok((client, connection))
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
|
||||
use postgres_protocol2::authentication::sasl;
|
||||
@@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
use crate::Error;
|
||||
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
|
||||
use crate::codec::{BackendMessage, BackendMessages, PostgresCodec};
|
||||
use crate::config::{self, AuthKeys, Config};
|
||||
use crate::maybe_tls_stream::MaybeTlsStream;
|
||||
use crate::tls::TlsStream;
|
||||
@@ -25,7 +25,7 @@ pub struct StartupStream<S, T> {
|
||||
delayed_notice: Vec<NoticeResponseBody>,
|
||||
}
|
||||
|
||||
impl<S, T> Sink<FrontendMessage> for StartupStream<S, T>
|
||||
impl<S, T> Sink<Bytes> for StartupStream<S, T>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -36,7 +36,7 @@ where
|
||||
Pin::new(&mut self.inner).poll_ready(cx)
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: FrontendMessage) -> io::Result<()> {
|
||||
fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> io::Result<()> {
|
||||
Pin::new(&mut self.inner).start_send(item)
|
||||
}
|
||||
|
||||
@@ -120,10 +120,7 @@ where
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::startup_message(&config.server_params, &mut buf).map_err(Error::encode)?;
|
||||
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
.map_err(Error::io)
|
||||
stream.send(buf.freeze()).await.map_err(Error::io)
|
||||
}
|
||||
|
||||
async fn authenticate<S, T>(stream: &mut StartupStream<S, T>, config: &Config) -> Result<(), Error>
|
||||
@@ -191,10 +188,7 @@ where
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::password_message(password, &mut buf).map_err(Error::encode)?;
|
||||
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
.map_err(Error::io)
|
||||
stream.send(buf.freeze()).await.map_err(Error::io)
|
||||
}
|
||||
|
||||
async fn authenticate_sasl<S, T>(
|
||||
@@ -253,10 +247,7 @@ where
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf).map_err(Error::encode)?;
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
.map_err(Error::io)?;
|
||||
stream.send(buf.freeze()).await.map_err(Error::io)?;
|
||||
|
||||
let body = match stream.try_next().await.map_err(Error::io)? {
|
||||
Some(Message::AuthenticationSaslContinue(body)) => body,
|
||||
@@ -272,10 +263,7 @@ where
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_response(scram.message(), &mut buf).map_err(Error::encode)?;
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
.map_err(Error::io)?;
|
||||
stream.send(buf.freeze()).await.map_err(Error::io)?;
|
||||
|
||||
let body = match stream.try_next().await.map_err(Error::io)? {
|
||||
Some(Message::AuthenticationSaslFinal(body)) => body,
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures_util::{Sink, Stream, ready};
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Sink, StreamExt, ready};
|
||||
use postgres_protocol2::message::backend::{Message, NoticeResponseBody};
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tracing::{info, trace};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
|
||||
use crate::error::DbError;
|
||||
use crate::Error;
|
||||
use crate::codec::{
|
||||
BackendMessage, BackendMessages, FrontendMessage, PostgresCodec, RecordNotices,
|
||||
};
|
||||
use crate::maybe_tls_stream::MaybeTlsStream;
|
||||
use crate::{AsyncMessage, Error, Notification};
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
enum State {
|
||||
@@ -33,18 +34,18 @@ enum State {
|
||||
/// occurred, or because its associated `Client` has dropped and all outstanding work has completed.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Connection<S, T> {
|
||||
/// HACK: we need this in the Neon Proxy.
|
||||
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
/// HACK: we need this in the Neon Proxy to forward params.
|
||||
pub parameters: HashMap<String, String>,
|
||||
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
|
||||
sender: PollSender<BackendMessages>,
|
||||
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
|
||||
notices: Option<RecordNotices>,
|
||||
|
||||
pending_responses: VecDeque<BackendMessage>,
|
||||
pending_response: Option<BackendMessages>,
|
||||
state: State,
|
||||
}
|
||||
|
||||
pub enum Never {}
|
||||
|
||||
impl<S, T> Connection<S, T>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -52,70 +53,42 @@ where
|
||||
{
|
||||
pub(crate) fn new(
|
||||
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
pending_responses: VecDeque<BackendMessage>,
|
||||
parameters: HashMap<String, String>,
|
||||
sender: mpsc::Sender<BackendMessages>,
|
||||
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
|
||||
) -> Connection<S, T> {
|
||||
Connection {
|
||||
stream,
|
||||
parameters,
|
||||
sender: PollSender::new(sender),
|
||||
receiver,
|
||||
pending_responses,
|
||||
notices: None,
|
||||
pending_response: None,
|
||||
state: State::Active,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_response(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<BackendMessage, Error>>> {
|
||||
if let Some(message) = self.pending_responses.pop_front() {
|
||||
trace!("retrying pending response");
|
||||
return Poll::Ready(Some(Ok(message)));
|
||||
}
|
||||
|
||||
Pin::new(&mut self.stream)
|
||||
.poll_next(cx)
|
||||
.map(|o| o.map(|r| r.map_err(Error::io)))
|
||||
}
|
||||
|
||||
/// Read and process messages from the connection to postgres.
|
||||
/// client <- postgres
|
||||
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<AsyncMessage, Error>> {
|
||||
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<Never, Error>> {
|
||||
loop {
|
||||
let message = match self.poll_response(cx)? {
|
||||
Poll::Ready(Some(message)) => message,
|
||||
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
|
||||
Poll::Pending => {
|
||||
trace!("poll_read: waiting on response");
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
|
||||
let messages = match message {
|
||||
BackendMessage::Async(Message::NoticeResponse(body)) => {
|
||||
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
|
||||
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
|
||||
}
|
||||
BackendMessage::Async(Message::NotificationResponse(body)) => {
|
||||
let notification = Notification {
|
||||
process_id: body.process_id(),
|
||||
channel: body.channel().map_err(Error::parse)?.to_string(),
|
||||
payload: body.message().map_err(Error::parse)?.to_string(),
|
||||
let messages = match self.pending_response.take() {
|
||||
Some(messages) => messages,
|
||||
None => {
|
||||
let message = match self.stream.poll_next_unpin(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
|
||||
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(Error::io(e))),
|
||||
Poll::Ready(Some(Ok(message))) => message,
|
||||
};
|
||||
return Poll::Ready(Ok(AsyncMessage::Notification(notification)));
|
||||
|
||||
match message {
|
||||
BackendMessage::Async(Message::NoticeResponse(body)) => {
|
||||
self.handle_notice(body)?;
|
||||
continue;
|
||||
}
|
||||
BackendMessage::Async(_) => continue,
|
||||
BackendMessage::Normal { messages } => messages,
|
||||
}
|
||||
}
|
||||
BackendMessage::Async(Message::ParameterStatus(body)) => {
|
||||
self.parameters.insert(
|
||||
body.name().map_err(Error::parse)?.to_string(),
|
||||
body.value().map_err(Error::parse)?.to_string(),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
BackendMessage::Async(_) => unreachable!(),
|
||||
BackendMessage::Normal { messages } => messages,
|
||||
};
|
||||
|
||||
match self.sender.poll_reserve(cx) {
|
||||
@@ -126,8 +99,7 @@ where
|
||||
return Poll::Ready(Err(Error::closed()));
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.pending_responses
|
||||
.push_back(BackendMessage::Normal { messages });
|
||||
self.pending_response = Some(messages);
|
||||
trace!("poll_read: waiting on sender");
|
||||
return Poll::Pending;
|
||||
}
|
||||
@@ -135,6 +107,31 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_notice(&mut self, body: NoticeResponseBody) -> Result<(), Error> {
|
||||
let Some(notices) = &mut self.notices else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut fields = body.fields();
|
||||
while let Some(field) = fields.next().map_err(Error::parse)? {
|
||||
// loop until we find the message field
|
||||
if field.type_() == b'M' {
|
||||
// if the message field is within the limit, send it.
|
||||
if let Some(new_limit) = notices.limit.checked_sub(field.value().len()) {
|
||||
match notices.sender.send(field.value().into()) {
|
||||
// set the new limit.
|
||||
Ok(()) => notices.limit = new_limit,
|
||||
// closed.
|
||||
Err(_) => self.notices = None,
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch the next client request and enqueue the response sender.
|
||||
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
|
||||
if self.receiver.is_closed() {
|
||||
@@ -168,21 +165,23 @@ where
|
||||
|
||||
match self.poll_request(cx) {
|
||||
// send the message to postgres
|
||||
Poll::Ready(Some(request)) => {
|
||||
Poll::Ready(Some(FrontendMessage::Raw(request))) => {
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.map_err(Error::io)?;
|
||||
}
|
||||
Poll::Ready(Some(FrontendMessage::RecordNotices(notices))) => {
|
||||
self.notices = Some(notices)
|
||||
}
|
||||
// No more messages from the client, and no more responses to wait for.
|
||||
// Send a terminate message to postgres
|
||||
Poll::Ready(None) => {
|
||||
trace!("poll_write: at eof, terminating");
|
||||
let mut request = BytesMut::new();
|
||||
frontend::terminate(&mut request);
|
||||
let request = FrontendMessage::Raw(request.freeze());
|
||||
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.start_send(request.freeze())
|
||||
.map_err(Error::io)?;
|
||||
|
||||
trace!("poll_write: sent eof, closing");
|
||||
@@ -231,34 +230,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the value of a runtime parameter for this connection.
|
||||
pub fn parameter(&self, name: &str) -> Option<&str> {
|
||||
self.parameters.get(name).map(|s| &**s)
|
||||
}
|
||||
|
||||
/// Polls for asynchronous messages from the server.
|
||||
///
|
||||
/// The server can send notices as well as notifications asynchronously to the client. Applications that wish to
|
||||
/// examine those messages should use this method to drive the connection rather than its `Future` implementation.
|
||||
pub fn poll_message(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<AsyncMessage, Error>>> {
|
||||
fn poll_message(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Never, Error>>> {
|
||||
if self.state != State::Closing {
|
||||
// if the state is still active, try read from and write to postgres.
|
||||
let message = self.poll_read(cx)?;
|
||||
let closing = self.poll_write(cx)?;
|
||||
if let Poll::Ready(()) = closing {
|
||||
let Poll::Pending = self.poll_read(cx)?;
|
||||
if self.poll_write(cx)?.is_ready() {
|
||||
self.state = State::Closing;
|
||||
}
|
||||
|
||||
if let Poll::Ready(message) = message {
|
||||
return Poll::Ready(Some(Ok(message)));
|
||||
}
|
||||
|
||||
// poll_read returned Pending.
|
||||
// poll_write returned Pending or Ready(WriteReady::WaitingOnRead).
|
||||
// if poll_write returned Ready(WriteReady::WaitingOnRead), then we are waiting to read more data from postgres.
|
||||
// poll_write returned Pending or Ready(()).
|
||||
// if poll_write returned Ready(()), then we are waiting to read more data from postgres.
|
||||
if self.state != State::Closing {
|
||||
return Poll::Pending;
|
||||
}
|
||||
@@ -280,11 +262,9 @@ where
|
||||
type Output = Result<(), Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
while let Some(message) = ready!(self.poll_message(cx)?) {
|
||||
if let AsyncMessage::Notice(notice) = message {
|
||||
info!("{}: {}", notice.severity(), notice.message());
|
||||
}
|
||||
match self.poll_message(cx)? {
|
||||
Poll::Ready(None) => Poll::Ready(Ok(())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ pub use crate::client::{Client, SocketConfig};
|
||||
pub use crate::config::Config;
|
||||
pub use crate::connect_raw::RawConnection;
|
||||
pub use crate::connection::Connection;
|
||||
use crate::error::DbError;
|
||||
pub use crate::error::Error;
|
||||
pub use crate::generic_client::GenericClient;
|
||||
pub use crate::query::RowStream;
|
||||
@@ -93,21 +92,6 @@ impl Notification {
|
||||
}
|
||||
}
|
||||
|
||||
/// An asynchronous message from the server.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub enum AsyncMessage {
|
||||
/// A notice.
|
||||
///
|
||||
/// Notices use the same format as errors, but aren't "errors" per-se.
|
||||
Notice(DbError),
|
||||
/// A notification.
|
||||
///
|
||||
/// Connections can subscribe to notifications with the `LISTEN` command.
|
||||
Notification(Notification),
|
||||
}
|
||||
|
||||
/// Message returned by the `SimpleQuery` stream.
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
|
||||
@@ -43,7 +43,7 @@ itertools.workspace = true
|
||||
sync_wrapper = { workspace = true, features = ["futures"] }
|
||||
|
||||
byteorder = "1.4"
|
||||
rand = "0.8.5"
|
||||
rand.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
|
||||
@@ -81,7 +81,7 @@ impl UnreliableWrapper {
|
||||
///
|
||||
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
|
||||
let mut attempts = self.attempts.lock().unwrap();
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
|
||||
match attempts.entry(op) {
|
||||
Entry::Occupied(mut e) => {
|
||||
@@ -94,7 +94,7 @@ impl UnreliableWrapper {
|
||||
/* BEGIN_HADRON */
|
||||
// If there are more attempts to fail, fail the request by probability.
|
||||
if (attempts_before_this < self.attempts_to_fail)
|
||||
&& (rng.gen_range(0..=100) < self.attempt_failure_probability)
|
||||
&& (rng.random_range(0..=100) < self.attempt_failure_probability)
|
||||
{
|
||||
let error =
|
||||
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
||||
|
||||
@@ -208,7 +208,7 @@ async fn create_azure_client(
|
||||
.as_millis();
|
||||
|
||||
// because nanos can be the same for two threads so can millis, add randomness
|
||||
let random = rand::thread_rng().r#gen::<u32>();
|
||||
let random = rand::rng().random::<u32>();
|
||||
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AzureContainer(AzureConfig {
|
||||
|
||||
@@ -385,7 +385,7 @@ async fn create_s3_client(
|
||||
.as_millis();
|
||||
|
||||
// because nanos can be the same for two threads so can millis, add randomness
|
||||
let random = rand::thread_rng().r#gen::<u32>();
|
||||
let random = rand::rng().random::<u32>();
|
||||
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(S3Config {
|
||||
|
||||
@@ -104,7 +104,7 @@ impl Id {
|
||||
|
||||
pub fn generate() -> Self {
|
||||
let mut tli_buf = [0u8; 16];
|
||||
rand::thread_rng().fill(&mut tli_buf);
|
||||
rand::rng().fill(&mut tli_buf);
|
||||
Id::from(tli_buf)
|
||||
}
|
||||
|
||||
|
||||
@@ -364,42 +364,37 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
/// Implements [`rand::distr::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
///
|
||||
/// This is used by the `pagebench` pageserver benchmarking tool.
|
||||
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
|
||||
pub struct LsnSampler(<u64 as rand::distr::uniform::SampleUniform>::Sampler);
|
||||
|
||||
impl rand::distributions::uniform::SampleUniform for Lsn {
|
||||
impl rand::distr::uniform::SampleUniform for Lsn {
|
||||
type Sampler = LsnSampler;
|
||||
}
|
||||
|
||||
impl rand::distributions::uniform::UniformSampler for LsnSampler {
|
||||
impl rand::distr::uniform::UniformSampler for LsnSampler {
|
||||
type X = Lsn;
|
||||
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Self
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Result<Self, rand::distr::uniform::Error>
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B1: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
<u64 as rand::distr::uniform::SampleUniform>::Sampler::new(low.borrow().0, high.borrow().0)
|
||||
.map(Self)
|
||||
}
|
||||
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Result<Self, rand::distr::uniform::Error>
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B1: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distr::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
<u64 as rand::distr::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
)
|
||||
.map(Self)
|
||||
}
|
||||
|
||||
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
|
||||
|
||||
@@ -25,6 +25,12 @@ pub struct ShardIndex {
|
||||
pub shard_count: ShardCount,
|
||||
}
|
||||
|
||||
/// Stripe size as number of pages.
|
||||
///
|
||||
/// NB: don't implement Default, so callers don't lazily use it by mistake. See DEFAULT_STRIPE_SIZE.
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardStripeSize(pub u32);
|
||||
|
||||
/// Formatting helper, for generating the `shard_id` label in traces.
|
||||
pub struct ShardSlug<'a>(&'a TenantShardId);
|
||||
|
||||
@@ -177,6 +183,12 @@ impl std::fmt::Display for ShardCount {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardStripeSize {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardSlug<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
|
||||
@@ -11,7 +11,8 @@ use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{LayerName, PersistentLayerDesc};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::prelude::{SeedableRng, StdRng};
|
||||
use rand::seq::IndexedRandom;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
|
||||
@@ -16,10 +16,9 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}
|
||||
use crate::retry::Retry;
|
||||
use crate::split::GetPageSplitter;
|
||||
use compute_api::spec::PageserverProtocol;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_page_api as page_api;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
|
||||
|
||||
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
|
||||
/// when full.
|
||||
@@ -141,8 +140,8 @@ impl PageserverClient {
|
||||
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
|
||||
return Err(anyhow!(
|
||||
"can't change stripe size from {} to {}",
|
||||
old.stripe_size,
|
||||
shard_spec.stripe_size
|
||||
old.stripe_size.expect("always Some when sharded"),
|
||||
shard_spec.stripe_size.expect("always Some when sharded")
|
||||
));
|
||||
}
|
||||
|
||||
@@ -157,23 +156,6 @@ impl PageserverClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns whether a relation exists.
|
||||
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
|
||||
pub async fn check_rel_exists(
|
||||
&self,
|
||||
req: page_api::CheckRelExistsRequest,
|
||||
) -> tonic::Result<page_api::CheckRelExistsResponse> {
|
||||
debug!("sending request: {req:?}");
|
||||
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.load_full().get_zero().client().await?;
|
||||
Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
|
||||
})
|
||||
.await?;
|
||||
debug!("received response: {resp:?}");
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// Returns the total size of a database, as # of bytes.
|
||||
#[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
|
||||
pub async fn get_db_size(
|
||||
@@ -249,13 +231,15 @@ impl PageserverClient {
|
||||
// Fast path: request is for a single shard.
|
||||
if let Some(shard_id) =
|
||||
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?
|
||||
{
|
||||
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
|
||||
}
|
||||
|
||||
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
||||
// reassemble the responses.
|
||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
|
||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
||||
|
||||
let mut shard_requests = FuturesUnordered::new();
|
||||
for (shard_id, shard_req) in splitter.drain_requests() {
|
||||
@@ -265,10 +249,14 @@ impl PageserverClient {
|
||||
}
|
||||
|
||||
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
|
||||
splitter.add_response(shard_id, shard_response)?;
|
||||
splitter
|
||||
.add_response(shard_id, shard_response)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
||||
}
|
||||
|
||||
splitter.get_response()
|
||||
splitter
|
||||
.get_response()
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))
|
||||
}
|
||||
|
||||
/// Fetches pages on the given shard. Does not retry internally.
|
||||
@@ -396,12 +384,14 @@ pub struct ShardSpec {
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size for these shards.
|
||||
stripe_size: ShardStripeSize,
|
||||
///
|
||||
/// INVARIANT: None for unsharded tenants, Some for sharded.
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
impl ShardSpec {
|
||||
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
|
||||
/// The stripe size may be omitted for unsharded tenants.
|
||||
/// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
|
||||
pub fn new(
|
||||
urls: HashMap<ShardIndex, String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
@@ -414,11 +404,13 @@ impl ShardSpec {
|
||||
n => ShardCount::new(n as u8),
|
||||
};
|
||||
|
||||
// Determine the stripe size. It doesn't matter for unsharded tenants.
|
||||
// Validate the stripe size.
|
||||
if stripe_size.is_none() && !count.is_unsharded() {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
}
|
||||
let stripe_size = stripe_size.unwrap_or_default();
|
||||
if stripe_size.is_some() && count.is_unsharded() {
|
||||
return Err(anyhow!("stripe size can't be given for unsharded tenants"));
|
||||
}
|
||||
|
||||
// Validate the shard spec.
|
||||
for (shard_id, url) in &urls {
|
||||
@@ -458,8 +450,10 @@ struct Shards {
|
||||
///
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size. Only used for sharded tenants.
|
||||
stripe_size: ShardStripeSize,
|
||||
/// The stripe size.
|
||||
///
|
||||
/// INVARIANT: None for unsharded tenants, Some for sharded.
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
impl Shards {
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use bytes::Bytes;
|
||||
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
|
||||
use pageserver_api::shard::key_to_shard_number;
|
||||
use pageserver_page_api as page_api;
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
|
||||
|
||||
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
|
||||
/// TODO: add tests for this.
|
||||
@@ -25,43 +26,54 @@ impl GetPageSplitter {
|
||||
pub fn for_single_shard(
|
||||
req: &page_api::GetPageRequest,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Option<ShardIndex> {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<Option<ShardIndex>> {
|
||||
// Fast path: unsharded tenant.
|
||||
if count.is_unsharded() {
|
||||
return Some(ShardIndex::unsharded());
|
||||
return Ok(Some(ShardIndex::unsharded()));
|
||||
}
|
||||
|
||||
// Find the first page's shard, for comparison. If there are no pages, just return the first
|
||||
// shard (caller likely checked already, otherwise the server will reject it).
|
||||
let Some(stripe_size) = stripe_size else {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
};
|
||||
|
||||
// Find the first page's shard, for comparison.
|
||||
let Some(&first_page) = req.block_numbers.first() else {
|
||||
return Some(ShardIndex::new(ShardNumber(0), count));
|
||||
return Err(anyhow!("no block numbers in request"));
|
||||
};
|
||||
let key = rel_block_to_key(req.rel, first_page);
|
||||
let shard_number = key_to_shard_number(count, stripe_size, &key);
|
||||
|
||||
req.block_numbers
|
||||
Ok(req
|
||||
.block_numbers
|
||||
.iter()
|
||||
.skip(1) // computed above
|
||||
.all(|&blkno| {
|
||||
let key = rel_block_to_key(req.rel, blkno);
|
||||
key_to_shard_number(count, stripe_size, &key) == shard_number
|
||||
})
|
||||
.then_some(ShardIndex::new(shard_number, count))
|
||||
.then_some(ShardIndex::new(shard_number, count)))
|
||||
}
|
||||
|
||||
/// Splits the given request.
|
||||
pub fn split(
|
||||
req: page_api::GetPageRequest,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Self {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// The caller should make sure we don't split requests unnecessarily.
|
||||
debug_assert!(
|
||||
Self::for_single_shard(&req, count, stripe_size).is_none(),
|
||||
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
|
||||
"unnecessary request split"
|
||||
);
|
||||
|
||||
if count.is_unsharded() {
|
||||
return Err(anyhow!("unsharded tenant, no point in splitting request"));
|
||||
}
|
||||
let Some(stripe_size) = stripe_size else {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
};
|
||||
|
||||
// Split the requests by shard index.
|
||||
let mut requests = HashMap::with_capacity(2); // common case
|
||||
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
|
||||
@@ -103,11 +115,11 @@ impl GetPageSplitter {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
requests,
|
||||
response,
|
||||
block_shards,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
|
||||
@@ -124,21 +136,30 @@ impl GetPageSplitter {
|
||||
&mut self,
|
||||
shard_id: ShardIndex,
|
||||
response: page_api::GetPageResponse,
|
||||
) -> tonic::Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The caller should already have converted status codes into tonic::Status.
|
||||
if response.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"unexpected non-OK response for shard {shard_id}: {} {}",
|
||||
response.status_code,
|
||||
response.reason.unwrap_or_default()
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
if response.request_id != self.response.request_id {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||
self.response.request_id, response.request_id
|
||||
)));
|
||||
self.response.request_id,
|
||||
response.request_id
|
||||
));
|
||||
}
|
||||
|
||||
if response.request_id != self.response.request_id {
|
||||
return Err(anyhow!(
|
||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||
self.response.request_id,
|
||||
response.request_id
|
||||
));
|
||||
}
|
||||
|
||||
// Place the shard response pages into the assembled response, in request order.
|
||||
@@ -150,27 +171,26 @@ impl GetPageSplitter {
|
||||
}
|
||||
|
||||
let Some(slot) = self.response.pages.get_mut(i) else {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"no block_shards slot {i} for shard {shard_id}"
|
||||
)));
|
||||
return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
|
||||
};
|
||||
let Some(page) = pages.next() else {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"missing page {} in shard {shard_id} response",
|
||||
slot.block_number
|
||||
)));
|
||||
));
|
||||
};
|
||||
if page.block_number != slot.block_number {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
|
||||
slot.block_number, page.block_number
|
||||
)));
|
||||
slot.block_number,
|
||||
page.block_number
|
||||
));
|
||||
}
|
||||
if !slot.image.is_empty() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned duplicate page {} at index {i}",
|
||||
slot.block_number
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
*slot = page;
|
||||
@@ -178,10 +198,10 @@ impl GetPageSplitter {
|
||||
|
||||
// Make sure we've consumed all pages from the shard response.
|
||||
if let Some(extra_page) = pages.next() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned extra page: {}",
|
||||
extra_page.block_number
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -189,18 +209,18 @@ impl GetPageSplitter {
|
||||
|
||||
/// Fetches the final, assembled response.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
|
||||
pub fn get_response(self) -> anyhow::Result<page_api::GetPageResponse> {
|
||||
// Check that the response is complete.
|
||||
for (i, page) in self.response.pages.iter().enumerate() {
|
||||
if page.image.is_empty() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"missing page {} for shard {}",
|
||||
page.block_number,
|
||||
self.block_shards
|
||||
.get(i)
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| "?".to_string())
|
||||
)));
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()>
|
||||
let cold_key_range = splitpoint..key_range.end;
|
||||
|
||||
for i in 0..cmd.num_records {
|
||||
let chosen_range = if rand::thread_rng().gen_bool(0.9) {
|
||||
let chosen_range = if rand::rng().random_bool(0.9) {
|
||||
&hot_key_range
|
||||
} else {
|
||||
&cold_key_range
|
||||
|
||||
@@ -300,9 +300,9 @@ impl MockTimeline {
|
||||
key_range: &Range<Key>,
|
||||
) -> anyhow::Result<()> {
|
||||
crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
for _ in 0..num_records {
|
||||
self.ingest_record(rng.gen_range(key_range.clone()), len);
|
||||
self.ingest_record(rng.random_range(key_range.clone()), len);
|
||||
self.wal_ingested += len;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -4,7 +4,7 @@ use anyhow::Context;
|
||||
use clap::Parser;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::{ShardCount, ShardStripeSize};
|
||||
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize};
|
||||
|
||||
#[derive(Parser)]
|
||||
pub(super) struct DescribeKeyCommand {
|
||||
@@ -128,7 +128,9 @@ impl DescribeKeyCommand {
|
||||
// seeing the sharding placement might be confusing, so leave it out unless shard
|
||||
// count was given.
|
||||
|
||||
let stripe_size = stripe_size.map(ShardStripeSize).unwrap_or_default();
|
||||
let stripe_size = stripe_size
|
||||
.map(ShardStripeSize)
|
||||
.unwrap_or(DEFAULT_STRIPE_SIZE);
|
||||
println!(
|
||||
"# placement with shard_count: {} and stripe_size: {}:",
|
||||
shard_count.0, stripe_size.0
|
||||
|
||||
@@ -17,11 +17,11 @@
|
||||
// grpcurl \
|
||||
// -plaintext \
|
||||
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
|
||||
// -H "neon-shard-id: 0b10" \
|
||||
// -H "neon-shard-id: 0000" \
|
||||
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
|
||||
// -H "authorization: Bearer $JWT" \
|
||||
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
|
||||
// localhost:51051 page_api.PageService/CheckRelExists
|
||||
// -d '{"read_lsn": {"request_lsn": 100000000, "not_modified_since_lsn": 1}, "db_oid": 1}' \
|
||||
// localhost:51051 page_api.PageService/GetDbSize
|
||||
// ```
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
@@ -38,8 +38,8 @@ package page_api;
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
// NB: unlike libpq, there is no CheckRelExists in gRPC, at the compute team's request. Instead,
|
||||
// use GetRelSize with allow_missing=true to check existence.
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
@@ -97,17 +97,6 @@ message RelTag {
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
|
||||
@@ -260,10 +249,15 @@ enum GetPageStatusCode {
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
// If true, return missing=true for missing relations instead of a NotFound error.
|
||||
bool allow_missing = 3;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
// The number of blocks in the relation.
|
||||
uint32 num_blocks = 1;
|
||||
// If allow_missing=true, this is true for missing relations.
|
||||
bool missing = 2;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
|
||||
@@ -69,16 +69,6 @@ impl Client {
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
/// Returns whether a relation exists.
|
||||
pub async fn check_rel_exists(
|
||||
&mut self,
|
||||
req: CheckRelExistsRequest,
|
||||
) -> tonic::Result<CheckRelExistsResponse> {
|
||||
let req = proto::CheckRelExistsRequest::from(req);
|
||||
let resp = self.inner.check_rel_exists(req).await?.into_inner();
|
||||
Ok(resp.into())
|
||||
}
|
||||
|
||||
/// Fetches a base backup.
|
||||
pub async fn get_base_backup(
|
||||
&mut self,
|
||||
@@ -114,7 +104,8 @@ impl Client {
|
||||
Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
|
||||
}
|
||||
|
||||
/// Returns the size of a relation, as # of blocks.
|
||||
/// Returns the size of a relation as # of blocks, or None if allow_missing=true and the
|
||||
/// relation does not exist.
|
||||
pub async fn get_rel_size(
|
||||
&mut self,
|
||||
req: GetRelSizeRequest,
|
||||
|
||||
@@ -139,50 +139,6 @@ impl From<RelTag> for proto::RelTag {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct CheckRelExistsRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CheckRelExistsRequest> for proto::CheckRelExistsRequest {
|
||||
fn from(request: CheckRelExistsRequest) -> Self {
|
||||
Self {
|
||||
read_lsn: Some(request.read_lsn.into()),
|
||||
rel: Some(request.rel.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type CheckRelExistsResponse = bool;
|
||||
|
||||
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
|
||||
fn from(pb: proto::CheckRelExistsResponse) -> Self {
|
||||
pb.exists
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
|
||||
fn from(exists: CheckRelExistsResponse) -> Self {
|
||||
Self { exists }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests a base backup.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetBaseBackupRequest {
|
||||
@@ -707,6 +663,8 @@ impl From<GetPageStatusCode> for tonic::Code {
|
||||
pub struct GetRelSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
/// If true, return missing=true for missing relations instead of a NotFound error.
|
||||
pub allow_missing: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
|
||||
@@ -719,6 +677,7 @@ impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
allow_missing: proto.allow_missing,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -728,21 +687,29 @@ impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
|
||||
Self {
|
||||
read_lsn: Some(request.read_lsn.into()),
|
||||
rel: Some(request.rel.into()),
|
||||
allow_missing: request.allow_missing,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetRelSizeResponse = u32;
|
||||
/// The size of a relation as number of blocks, or None if `allow_missing=true` and the relation
|
||||
/// does not exist.
|
||||
///
|
||||
/// INVARIANT: never None if `allow_missing=false` (returns `NotFound` error instead).
|
||||
pub type GetRelSizeResponse = Option<u32>;
|
||||
|
||||
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
|
||||
fn from(proto: proto::GetRelSizeResponse) -> Self {
|
||||
proto.num_blocks
|
||||
fn from(pb: proto::GetRelSizeResponse) -> Self {
|
||||
(!pb.missing).then_some(pb.num_blocks)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
|
||||
fn from(num_blocks: GetRelSizeResponse) -> Self {
|
||||
Self { num_blocks }
|
||||
fn from(resp: GetRelSizeResponse) -> Self {
|
||||
Self {
|
||||
num_blocks: resp.unwrap_or_default(),
|
||||
missing: resp.is_none(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -188,9 +188,9 @@ async fn main_impl(
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (timeline, work) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.random_range(r));
|
||||
(target.timeline, Work { lsn })
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
|
||||
@@ -326,8 +326,7 @@ async fn main_impl(
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights =
|
||||
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
rand::distr::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len())).unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
let scheme = match Url::parse(&args.page_service_connstring) {
|
||||
@@ -427,7 +426,7 @@ async fn run_worker(
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
weights: rand::distr::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
@@ -469,9 +468,9 @@ async fn run_worker(
|
||||
}
|
||||
|
||||
// Pick a random page from a random relation.
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key: i128 = rng.random_range(r.start..r.end);
|
||||
let (rel_tag, block_no) = key_to_block(key);
|
||||
|
||||
let mut blks = VecDeque::with_capacity(batch_size);
|
||||
@@ -502,7 +501,7 @@ async fn run_worker(
|
||||
// We assume that the entire batch can fit within the relation.
|
||||
assert_eq!(blks.len(), batch_size, "incomplete batch");
|
||||
|
||||
let req_lsn = if rng.gen_bool(args.req_latest_probability) {
|
||||
let req_lsn = if rng.random_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::time::{Duration, Instant};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::seq::IndexedMutRandom;
|
||||
use tokio::sync::{OwnedSemaphorePermit, mpsc};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -260,7 +260,7 @@ async fn timeline_actor(
|
||||
|
||||
loop {
|
||||
let layer_tx = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
timeline.layers.choose_mut(&mut rng).expect("no layers")
|
||||
};
|
||||
match layer_tx.try_send(permit.take().unwrap()) {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
@@ -16,7 +17,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::{backoff, failpoint_support};
|
||||
use utils::{backoff, failpoint_support, ip_address};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
@@ -27,6 +28,7 @@ pub struct StorageControllerUpcallClient {
|
||||
http_client: reqwest::Client,
|
||||
base_url: Url,
|
||||
node_id: NodeId,
|
||||
node_ip_addr: Option<IpAddr>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -40,6 +42,7 @@ pub trait StorageControllerUpcallApi {
|
||||
fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
empty_local_disk: bool,
|
||||
) -> impl Future<
|
||||
Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
|
||||
> + Send;
|
||||
@@ -91,11 +94,18 @@ impl StorageControllerUpcallClient {
|
||||
);
|
||||
}
|
||||
|
||||
// Intentionally panics if we encountered any errors parsing or reading the IP address.
|
||||
// Note that if the required environment variable is not set, `read_node_ip_addr_from_env` returns `Ok(None)`
|
||||
// instead of an error.
|
||||
let node_ip_addr =
|
||||
ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
|
||||
|
||||
Self {
|
||||
http_client: client.build().expect("Failed to construct HTTP client"),
|
||||
base_url: url,
|
||||
node_id: conf.id,
|
||||
cancel: cancel.clone(),
|
||||
node_ip_addr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +156,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
empty_local_disk: bool,
|
||||
) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
|
||||
let url = self
|
||||
.base_url
|
||||
@@ -193,8 +204,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
listen_https_port: m.https_port,
|
||||
node_ip_addr: self.node_ip_addr,
|
||||
availability_zone_id: az_id.expect("Checked above"),
|
||||
node_ip_addr: None,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -217,6 +228,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
let request = ReAttachRequest {
|
||||
node_id: self.node_id,
|
||||
register: register.clone(),
|
||||
empty_local_disk: Some(empty_local_disk),
|
||||
};
|
||||
|
||||
let response: ReAttachResponse = self
|
||||
|
||||
@@ -768,6 +768,7 @@ mod test {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
_conf: &PageServerConf,
|
||||
_empty_local_disk: bool,
|
||||
) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ impl FeatureResolver {
|
||||
);
|
||||
|
||||
let tenant_properties = PerTenantProperties {
|
||||
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
|
||||
remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
|
||||
}
|
||||
.into_posthog_properties();
|
||||
|
||||
|
||||
@@ -1636,9 +1636,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
Self::handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
Self::handle_get_nblocks_request(&shard, &req, false, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| msg.expect("allow_missing=false"))
|
||||
.map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
@@ -2303,12 +2304,16 @@ impl PageServerHandler {
|
||||
Ok(PagestreamExistsResponse { req: *req, exists })
|
||||
}
|
||||
|
||||
/// If `allow_missing` is true, returns None instead of Err on missing relations. Otherwise,
|
||||
/// never returns None. It is only supported by the gRPC protocol, so we pass it separately to
|
||||
/// avoid changing the libpq protocol types.
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_nblocks_request(
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
allow_missing: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamNblocksResponse, PageStreamError> {
|
||||
) -> Result<Option<PagestreamNblocksResponse>, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2320,20 +2325,25 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(
|
||||
.get_rel_size_in_reldir(
|
||||
req.rel,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
None,
|
||||
allow_missing,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let Some(n_blocks) = n_blocks else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(PagestreamNblocksResponse {
|
||||
Ok(Some(PagestreamNblocksResponse {
|
||||
req: *req,
|
||||
n_blocks,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
@@ -3218,13 +3228,25 @@ where
|
||||
pub struct GrpcPageServiceHandler {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
ctx: RequestContext,
|
||||
|
||||
/// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for
|
||||
/// in-flight requests to complete. Any tasks we spawn ourselves must respect this token.
|
||||
cancel: CancellationToken,
|
||||
|
||||
/// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to
|
||||
/// complete during shutdown. Request handlers implicitly hold this guard already.
|
||||
gate_guard: GateGuard,
|
||||
|
||||
/// `get_vectored` concurrency setting.
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
}
|
||||
|
||||
impl GrpcPageServiceHandler {
|
||||
/// Spawns a gRPC server for the page service.
|
||||
///
|
||||
/// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for
|
||||
/// any in-flight requests and tasks to complete first.
|
||||
///
|
||||
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
|
||||
/// need to reimplement the TCP+TLS accept loop ourselves.
|
||||
pub fn spawn(
|
||||
@@ -3234,12 +3256,15 @@ impl GrpcPageServiceHandler {
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
listener: std::net::TcpListener,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
// Set up a cancellation token for shutting down the server, and a gate to wait for all
|
||||
// requests and spawned tasks to complete.
|
||||
let cancel = CancellationToken::new();
|
||||
let gate = Gate::default();
|
||||
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch)
|
||||
.detached_child();
|
||||
let gate = Gate::default();
|
||||
|
||||
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
|
||||
// port early during startup.
|
||||
@@ -3270,6 +3295,7 @@ impl GrpcPageServiceHandler {
|
||||
let page_service_handler = GrpcPageServiceHandler {
|
||||
tenant_manager,
|
||||
ctx,
|
||||
cancel: cancel.clone(),
|
||||
gate_guard: gate.enter().expect("gate was just created"),
|
||||
get_vectored_concurrent_io,
|
||||
};
|
||||
@@ -3306,19 +3332,20 @@ impl GrpcPageServiceHandler {
|
||||
.build_v1()?;
|
||||
let server = server.add_service(reflection_service);
|
||||
|
||||
// Spawn server task.
|
||||
// Spawn server task. It runs until the cancellation token fires and in-flight requests and
|
||||
// tasks complete. The `CancellableTask` will wait for the task's join handle, which
|
||||
// implicitly waits for the gate to close.
|
||||
let task_cancel = cancel.clone();
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"grpc listener",
|
||||
"grpc pageservice listener",
|
||||
async move {
|
||||
let result = server
|
||||
server
|
||||
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
|
||||
.await;
|
||||
if result.is_ok() {
|
||||
// TODO: revisit shutdown logic once page service is implemented.
|
||||
gate.close().await;
|
||||
}
|
||||
result
|
||||
.await?;
|
||||
// Server exited cleanly. All requests should have completed by now. Wait for any
|
||||
// spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate.
|
||||
gate.close().await;
|
||||
anyhow::Ok(())
|
||||
},
|
||||
));
|
||||
|
||||
@@ -3508,7 +3535,10 @@ impl GrpcPageServiceHandler {
|
||||
|
||||
/// Implements the gRPC page service.
|
||||
///
|
||||
/// TODO: cancellation.
|
||||
/// On client disconnect (e.g. timeout or client shutdown), Tonic will drop the request handler
|
||||
/// futures, so the read path must be cancellation-safe. On server shutdown, Tonic will wait for
|
||||
/// in-flight requests to complete.
|
||||
///
|
||||
/// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
|
||||
#[tonic::async_trait]
|
||||
impl proto::PageService for GrpcPageServiceHandler {
|
||||
@@ -3519,39 +3549,6 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
#[instrument(skip_all, fields(rel, lsn))]
|
||||
async fn check_rel_exists(
|
||||
&self,
|
||||
req: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
|
||||
let received_at = extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request, decorate the span, and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&timeline)?;
|
||||
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
|
||||
|
||||
span_record!(rel=%req.rel, lsn=%req.read_lsn);
|
||||
|
||||
let req = PagestreamExistsRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, None),
|
||||
rel: req.rel,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::CheckRelExistsResponse = resp.exists;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(lsn))]
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
@@ -3593,8 +3590,14 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
// Spawn a task to run the basebackup.
|
||||
let span = Span::current();
|
||||
let gate_guard = self
|
||||
.gate_guard
|
||||
.try_clone()
|
||||
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
|
||||
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
|
||||
let jh = tokio::spawn(async move {
|
||||
let _gate_guard = gate_guard; // keep gate open until task completes
|
||||
|
||||
let gzip_level = match req.compression {
|
||||
page_api::BaseBackupCompression::None => None,
|
||||
// NB: using fast compression because it's on the critical path for compute
|
||||
@@ -3718,15 +3721,17 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.await?;
|
||||
|
||||
// Spawn an IoConcurrency sidecar, if enabled.
|
||||
let Ok(gate_guard) = self.gate_guard.try_clone() else {
|
||||
return Err(tonic::Status::unavailable("shutting down"));
|
||||
};
|
||||
let gate_guard = self
|
||||
.gate_guard
|
||||
.try_clone()
|
||||
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
|
||||
|
||||
// Spawn a task to handle the GetPageRequest stream.
|
||||
// Construct the GetPageRequest stream handler.
|
||||
let span = Span::current();
|
||||
let ctx = self.ctx.attached_child();
|
||||
let cancel = self.cancel.clone();
|
||||
let mut reqs = req.into_inner();
|
||||
|
||||
let resps = async_stream::try_stream! {
|
||||
@@ -3734,7 +3739,19 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
loop {
|
||||
// NB: Tonic considers the entire stream to be an in-flight request and will wait
|
||||
// for it to complete before shutting down. React to cancellation between requests.
|
||||
let req = tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")),
|
||||
|
||||
result = reqs.message() => match result {
|
||||
Ok(Some(req)) => Ok(req),
|
||||
Ok(None) => break, // client closed the stream
|
||||
Err(err) => Err(err),
|
||||
},
|
||||
}?;
|
||||
let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
|
||||
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
|
||||
.instrument(span.clone()) // propagate request span
|
||||
@@ -3758,7 +3775,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
Ok(tonic::Response::new(Box::pin(resps)))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(rel, lsn))]
|
||||
#[instrument(skip_all, fields(rel, lsn, allow_missing))]
|
||||
async fn get_rel_size(
|
||||
&self,
|
||||
req: tonic::Request<proto::GetRelSizeRequest>,
|
||||
@@ -3770,8 +3787,9 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
// Validate the request, decorate the span, and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&timeline)?;
|
||||
let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
|
||||
let allow_missing = req.allow_missing;
|
||||
|
||||
span_record!(rel=%req.rel, lsn=%req.read_lsn);
|
||||
span_record!(rel=%req.rel, lsn=%req.read_lsn, allow_missing=%req.allow_missing);
|
||||
|
||||
let req = PagestreamNblocksRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, None),
|
||||
@@ -3786,8 +3804,11 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::GetRelSizeResponse = resp.n_blocks;
|
||||
let resp =
|
||||
PageServerHandler::handle_get_nblocks_request(&timeline, &req, allow_missing, &ctx)
|
||||
.await?;
|
||||
let resp: page_api::GetRelSizeResponse = resp.map(|resp| resp.n_blocks);
|
||||
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
|
||||
@@ -286,6 +286,10 @@ impl Timeline {
|
||||
/// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
|
||||
///
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
///
|
||||
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
|
||||
/// if the client goes away (e.g. due to timeout or cancellation).
|
||||
/// TODO: verify that it actually is cancellation-safe.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
|
||||
@@ -500,8 +504,9 @@ impl Timeline {
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self
|
||||
.get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx)
|
||||
.await?;
|
||||
.get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
|
||||
.await?
|
||||
.expect("allow_missing=false");
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -517,10 +522,16 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
self.get_rel_size_in_reldir(tag, version, None, ctx).await
|
||||
Ok(self
|
||||
.get_rel_size_in_reldir(tag, version, None, false, ctx)
|
||||
.await?
|
||||
.expect("allow_missing=false"))
|
||||
}
|
||||
|
||||
/// Get size of a relation file. The relation must exist, otherwise an error is returned.
|
||||
/// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
|
||||
/// otherwise errors.
|
||||
///
|
||||
/// INVARIANT: never returns None if `allow_missing=false`.
|
||||
///
|
||||
/// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
|
||||
pub(crate) async fn get_rel_size_in_reldir(
|
||||
@@ -528,8 +539,9 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
|
||||
allow_missing: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
) -> Result<Option<BlockNumber>, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -537,7 +549,15 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
|
||||
return Ok(nblocks);
|
||||
return Ok(Some(nblocks));
|
||||
}
|
||||
|
||||
if allow_missing
|
||||
&& !self
|
||||
.get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
@@ -549,7 +569,7 @@ impl Timeline {
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
// without extending it. Tolerate that by claiming that
|
||||
// any non-existent FSM fork has size 0.
|
||||
return Ok(0);
|
||||
return Ok(Some(0));
|
||||
}
|
||||
|
||||
let key = rel_size_to_key(tag);
|
||||
@@ -558,7 +578,7 @@ impl Timeline {
|
||||
|
||||
self.update_cached_rel_size(tag, version, nblocks);
|
||||
|
||||
Ok(nblocks)
|
||||
Ok(Some(nblocks))
|
||||
}
|
||||
|
||||
/// Does the relation exist?
|
||||
@@ -2908,9 +2928,8 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
mod tests {
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use utils::id::TimelineId;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
|
||||
@@ -6161,11 +6161,11 @@ mod tests {
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use rand::Rng;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, thread_rng};
|
||||
#[cfg(feature = "testing")]
|
||||
use std::ops::Range;
|
||||
use storage_layer::{IoConcurrency, PersistentLayerKey};
|
||||
@@ -6286,8 +6286,8 @@ mod tests {
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
let gap = random.random_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.random_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
@@ -6330,8 +6330,8 @@ mod tests {
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
let gap = random.random_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.random_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
@@ -7808,7 +7808,7 @@ mod tests {
|
||||
for _ in 0..50 {
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
let blknum = rand::rng().random_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
@@ -7897,7 +7897,7 @@ mod tests {
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
let blknum = rand::rng().random_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
@@ -7965,7 +7965,7 @@ mod tests {
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
let blknum = rand::rng().random_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
@@ -8229,7 +8229,7 @@ mod tests {
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
let blknum = rand::rng().random_range(0..NUM_KEYS);
|
||||
test_key.field6 = (blknum * STEP) as u32;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
@@ -8502,7 +8502,7 @@ mod tests {
|
||||
for iter in 1..=10 {
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
let blknum = rand::rng().random_range(0..NUM_KEYS);
|
||||
test_key.field6 = (blknum * STEP) as u32;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
@@ -11291,10 +11291,10 @@ mod tests {
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_read_path() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::seq::IndexedRandom;
|
||||
|
||||
let seed = if cfg!(feature = "fuzz-read-path") {
|
||||
let seed: u64 = thread_rng().r#gen();
|
||||
let seed: u64 = rand::rng().random();
|
||||
seed
|
||||
} else {
|
||||
// Use a hard-coded seed when not in fuzzing mode.
|
||||
@@ -11308,8 +11308,8 @@ mod tests {
|
||||
|
||||
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
|
||||
const QUERIES: u64 = 5000;
|
||||
let will_init_chance: u8 = random.gen_range(0..=10);
|
||||
let gap_chance: u8 = random.gen_range(0..=50);
|
||||
let will_init_chance: u8 = random.random_range(0..=10);
|
||||
let gap_chance: u8 = random.random_range(0..=50);
|
||||
|
||||
(QUERIES, will_init_chance, gap_chance)
|
||||
} else {
|
||||
@@ -11410,7 +11410,8 @@ mod tests {
|
||||
|
||||
while used_keys.len() < tenant.conf.max_get_vectored_keys.get() {
|
||||
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
|
||||
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
|
||||
let mut selected_key =
|
||||
start_key.add(random.random_range(0..KEY_DIMENSION_SIZE));
|
||||
|
||||
while used_keys.len() < tenant.conf.max_get_vectored_keys.get() {
|
||||
if used_keys.contains(&selected_key)
|
||||
@@ -11425,7 +11426,7 @@ mod tests {
|
||||
.add_key(selected_key);
|
||||
used_keys.insert(selected_key);
|
||||
|
||||
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
|
||||
let pick_next = random.random_range(0..=100) <= PICK_NEXT_CHANCE;
|
||||
if pick_next {
|
||||
selected_key = selected_key.next();
|
||||
} else {
|
||||
|
||||
@@ -535,8 +535,8 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
pub(crate) fn random_array(len: usize) -> Vec<u8> {
|
||||
let mut rng = rand::thread_rng();
|
||||
(0..len).map(|_| rng.r#gen()).collect::<_>()
|
||||
let mut rng = rand::rng();
|
||||
(0..len).map(|_| rng.random()).collect::<_>()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -588,9 +588,9 @@ pub(crate) mod tests {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
let blobs = (0..1024)
|
||||
.map(|_| {
|
||||
let mut sz: u16 = rng.r#gen();
|
||||
let mut sz: u16 = rng.random();
|
||||
// Make 50% of the arrays small
|
||||
if rng.r#gen() {
|
||||
if rng.random() {
|
||||
sz &= 63;
|
||||
}
|
||||
random_array(sz.into())
|
||||
|
||||
@@ -1090,7 +1090,7 @@ pub(crate) mod tests {
|
||||
const NUM_KEYS: usize = 100000;
|
||||
let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
|
||||
for idx in 0..NUM_KEYS {
|
||||
let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
let u: f64 = rand::rng().random_range(0.0..1.0);
|
||||
let t = -(f64::ln(u));
|
||||
let key_int = (t * 1000000.0) as u128;
|
||||
|
||||
@@ -1116,7 +1116,7 @@ pub(crate) mod tests {
|
||||
|
||||
// Test get() operations on random keys, most of which will not exist
|
||||
for _ in 0..100000 {
|
||||
let key_int = rand::thread_rng().r#gen::<u128>();
|
||||
let key_int = rand::rng().random::<u128>();
|
||||
let search_key = u128::to_be_bytes(key_int);
|
||||
assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
|
||||
}
|
||||
|
||||
@@ -508,8 +508,8 @@ mod tests {
|
||||
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
let content: Vec<u8> = rand::rng()
|
||||
.sample_iter(rand::distr::StandardUniform)
|
||||
.take(write_nbytes)
|
||||
.collect();
|
||||
|
||||
@@ -565,8 +565,8 @@ mod tests {
|
||||
let cap = writer.mutable().capacity();
|
||||
drop(writer);
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
let content: Vec<u8> = rand::rng()
|
||||
.sample_iter(rand::distr::StandardUniform)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
@@ -614,8 +614,8 @@ mod tests {
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
drop(writer);
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
let content: Vec<u8> = rand::rng()
|
||||
.sample_iter(rand::distr::StandardUniform)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use pageserver_api::shard::{
|
||||
};
|
||||
use pageserver_api::upcall_api::ReAttachResponseTenant;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::distr::Alphanumeric;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use sysinfo::SystemExt;
|
||||
use tokio::fs;
|
||||
@@ -218,7 +218,7 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"Path must be absolute",
|
||||
))?;
|
||||
let rand_suffix = rand::thread_rng()
|
||||
let rand_suffix = rand::rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
@@ -328,7 +328,7 @@ fn emergency_generations(
|
||||
LocationMode::Attached(alc) => TenantStartupMode::Attached((
|
||||
alc.attach_mode,
|
||||
alc.generation,
|
||||
ShardStripeSize::default(),
|
||||
lc.shard.stripe_size,
|
||||
)),
|
||||
LocationMode::Secondary(_) => TenantStartupMode::Secondary,
|
||||
},
|
||||
@@ -352,7 +352,8 @@ async fn init_load_generations(
|
||||
let client = StorageControllerUpcallClient::new(conf, cancel);
|
||||
info!("Calling {} API to re-attach tenants", client.base_url());
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach(conf).await {
|
||||
let empty_local_disk = tenant_confs.is_empty();
|
||||
match client.re_attach(conf, empty_local_disk).await {
|
||||
Ok(tenants) => tenants
|
||||
.into_iter()
|
||||
.flat_map(|(id, rart)| {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardStripeSize;
|
||||
|
||||
/// Tenant shard manifest, stored in remote storage. Contains offloaded timelines and other tenant
|
||||
/// shard-wide information that must be persisted in remote storage.
|
||||
|
||||
@@ -25,7 +25,7 @@ pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
|
||||
if d == Duration::ZERO {
|
||||
d
|
||||
} else {
|
||||
rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
|
||||
rand::rng().random_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ pub(super) fn period_warmup(period: Duration) -> Duration {
|
||||
if period == Duration::ZERO {
|
||||
period
|
||||
} else {
|
||||
rand::thread_rng().gen_range(Duration::ZERO..period)
|
||||
rand::rng().random_range(Duration::ZERO..period)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1634,7 +1634,8 @@ pub(crate) mod test {
|
||||
use bytes::Bytes;
|
||||
use itertools::MinMaxResult;
|
||||
use postgres_ffi::PgMajorVersion;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::prelude::{SeedableRng, StdRng};
|
||||
use rand::seq::IndexedRandom;
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
@@ -1788,14 +1789,14 @@ pub(crate) mod test {
|
||||
|
||||
let mut entries = Vec::new();
|
||||
for _ in 0..constants::KEY_COUNT {
|
||||
let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
|
||||
let count = rng.random_range(1..constants::MAX_ENTRIES_PER_KEY);
|
||||
let mut lsns_iter =
|
||||
std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
|
||||
Some(Lsn(lsn.0 + 0x08))
|
||||
});
|
||||
let mut lsns = Vec::new();
|
||||
while lsns.len() < count as usize {
|
||||
let take = rng.gen_bool(0.5);
|
||||
let take = rng.random_bool(0.5);
|
||||
let lsn = lsns_iter.next().unwrap();
|
||||
if take {
|
||||
lsns.push(lsn);
|
||||
@@ -1869,12 +1870,13 @@ pub(crate) mod test {
|
||||
for _ in 0..constants::RANGES_COUNT {
|
||||
let mut range: Option<Range<Key>> = Option::default();
|
||||
while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
|
||||
let range_start = rng.gen_range(start..end);
|
||||
let range_start = rng.random_range(start..end);
|
||||
let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
|
||||
if range_end_offset >= end {
|
||||
range = Some(Key::from_i128(range_start)..Key::from_i128(end));
|
||||
} else {
|
||||
let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
|
||||
let range_end =
|
||||
rng.random_range((range_start + constants::MIN_RANGE_SIZE)..end);
|
||||
range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,8 +440,8 @@ mod tests {
|
||||
impl InMemoryFile {
|
||||
fn new_random(len: usize) -> Self {
|
||||
Self {
|
||||
content: rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
content: rand::rng()
|
||||
.sample_iter(rand::distr::StandardUniform)
|
||||
.take(len)
|
||||
.collect(),
|
||||
}
|
||||
@@ -498,7 +498,7 @@ mod tests {
|
||||
len
|
||||
}
|
||||
};
|
||||
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs
|
||||
rand::Rng::fill(&mut rand::rng(), &mut dst_slice[nread..]); // to discover bugs
|
||||
Ok((dst, nread))
|
||||
}
|
||||
}
|
||||
@@ -763,7 +763,7 @@ mod tests {
|
||||
let len = std::cmp::min(dst.bytes_total(), mocked_bytes.len());
|
||||
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
|
||||
dst_slice[..len].copy_from_slice(&mocked_bytes[..len]);
|
||||
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
|
||||
rand::Rng::fill(&mut rand::rng(), &mut dst_slice[len..]); // to discover bugs
|
||||
Ok((dst, len))
|
||||
}
|
||||
Err(e) => Err(std::io::Error::other(e)),
|
||||
|
||||
@@ -515,7 +515,7 @@ pub(crate) async fn sleep_random_range(
|
||||
interval: RangeInclusive<Duration>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Duration, Cancelled> {
|
||||
let delay = rand::thread_rng().gen_range(interval);
|
||||
let delay = rand::rng().random_range(interval);
|
||||
if delay == Duration::ZERO {
|
||||
return Ok(delay);
|
||||
}
|
||||
|
||||
@@ -1324,6 +1324,9 @@ impl Timeline {
|
||||
///
|
||||
/// This naive implementation will be replaced with a more efficient one
|
||||
/// which actually vectorizes the read path.
|
||||
///
|
||||
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
|
||||
/// if the client goes away (e.g. due to timeout or cancellation).
|
||||
pub(crate) async fn get_vectored(
|
||||
&self,
|
||||
query: VersionedKeySpaceQuery,
|
||||
@@ -2823,7 +2826,7 @@ impl Timeline {
|
||||
if r.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..r.denominator) < r.numerator
|
||||
rand::rng().random_range(0..r.denominator) < r.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
@@ -3905,7 +3908,7 @@ impl Timeline {
|
||||
// 1hour base
|
||||
(60_i64 * 60_i64)
|
||||
// 10min jitter
|
||||
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
|
||||
+ rand::rng().random_range(-10 * 60..10 * 60),
|
||||
)
|
||||
.expect("10min < 1hour"),
|
||||
);
|
||||
|
||||
@@ -654,7 +654,7 @@ mod tests {
|
||||
use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::DEFAULT_STRIPE_SIZE;
|
||||
use utils::shard::ShardCount;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -955,7 +955,7 @@ mod tests {
|
||||
});
|
||||
let child_params = ShardParameters {
|
||||
count: ShardCount(2),
|
||||
stripe_size: ShardStripeSize::default(),
|
||||
stripe_size: DEFAULT_STRIPE_SIZE,
|
||||
};
|
||||
let child0 = Arc::new_cyclic(|myself| StubTimeline {
|
||||
gate: Default::default(),
|
||||
|
||||
@@ -1275,8 +1275,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use rand::Rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{Rng, thread_rng};
|
||||
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
@@ -1358,7 +1358,7 @@ mod tests {
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
|
||||
file_b_dupes.as_mut_slice().shuffle(&mut rand::rng());
|
||||
for vfile in file_b_dupes.iter_mut() {
|
||||
assert_first_512_eq(vfile, b"content_b").await;
|
||||
}
|
||||
@@ -1413,9 +1413,8 @@ mod tests {
|
||||
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
let f = &files[rand::rng().random_range(0..files.len())];
|
||||
buf = f
|
||||
.read_exact_at(buf.slice_full(), 0, &ctx)
|
||||
.await
|
||||
|
||||
@@ -5,6 +5,7 @@ MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
communicator.o \
|
||||
communicator_process.o \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
hll.o \
|
||||
@@ -29,6 +30,11 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S), Darwin)
|
||||
SHLIB_LINK += -framework Security -framework CoreFoundation -framework SystemConfiguration
|
||||
endif
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
@@ -57,7 +63,8 @@ WALPROP_OBJS = \
|
||||
|
||||
# libcommunicator.a is built by cargo from the Rust sources under communicator/
|
||||
# subdirectory. `cargo build` also generates communicator_bindings.h.
|
||||
neon.o: communicator/communicator_bindings.h
|
||||
communicator_process.o: communicator/communicator_bindings.h
|
||||
file_cache.o: communicator/communicator_bindings.h
|
||||
|
||||
$(NEON_CARGO_ARTIFACT_TARGET_DIR)/libcommunicator.a communicator/communicator_bindings.h &:
|
||||
(cd $(srcdir)/communicator && cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE))
|
||||
@@ -80,11 +87,14 @@ libwalproposer.a: $(WALPROP_OBJS)
|
||||
# INDENT pointing to pg_bsd_indent
|
||||
# PGINDENT_SCRIPT pointing to pgindent (be careful with PGINDENT var name:
|
||||
# pgindent will pick it up as pg_bsd_indent path).
|
||||
#
|
||||
# optional vars:
|
||||
# PGINDENT_FLAGS additional flags to pass to pgindent
|
||||
.PHONY: pgindent
|
||||
pgindent:
|
||||
+@ echo top_srcdir=$(top_srcdir) top_builddir=$(top_builddir) srcdir=$(srcdir)
|
||||
$(FIND_TYPEDEF) . > neon.typedefs
|
||||
INDENT=$(INDENT) $(PGINDENT_SCRIPT) --typedefs neon.typedefs $(srcdir)/*.c $(srcdir)/*.h
|
||||
INDENT=$(INDENT) $(PGINDENT_SCRIPT) $(PGINDENT_FLAGS) --typedefs neon.typedefs $(srcdir)/*.c $(srcdir)/*.h
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
|
||||
@@ -1820,12 +1820,12 @@ nm_to_string(NeonMessage *msg)
|
||||
}
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
#if 0
|
||||
NeonGetPageResponse *msg_resp = (NeonGetPageResponse *) msg;
|
||||
#endif
|
||||
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetPageResponse\"");
|
||||
appendStringInfo(&s, ", \"page\": \"XXX\"}");
|
||||
appendStringInfo(&s, ", \"rinfo\": %u/%u/%u", RelFileInfoFmt(msg_resp->req.rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_resp->req.forknum);
|
||||
appendStringInfo(&s, ", \"blkno\": %u", msg_resp->req.blkno);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -11,9 +11,19 @@ crate-type = ["staticlib"]
|
||||
# 'testing' feature is currently unused in the communicator, but we accept it for convenience of
|
||||
# calling build scripts, so that you can pass the same feature to all packages.
|
||||
testing = []
|
||||
# 'rest_broker' feature is currently unused in the communicator, but we accept it for convenience of
|
||||
# calling build scripts, so that you can pass the same feature to all packages.
|
||||
rest_broker = []
|
||||
|
||||
[dependencies]
|
||||
neon-shmem.workspace = true
|
||||
axum.workspace = true
|
||||
http.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
measured.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -1,7 +1,22 @@
|
||||
This package will evolve into a "compute-pageserver communicator"
|
||||
process and machinery. For now, it's just a dummy that doesn't do
|
||||
anything interesting, but it allows us to test the compilation and
|
||||
linking of Rust code into the Postgres extensions.
|
||||
# Communicator
|
||||
|
||||
This package provides the so-called "compute-pageserver communicator",
|
||||
or just "communicator" in short. The communicator is a separate
|
||||
background worker process that runs in the PostgreSQL server. It's
|
||||
part of the neon extension. Currently, it only provides an HTTP
|
||||
endpoint for metrics, but in the future it will evolve to handle all
|
||||
communications with the pageservers.
|
||||
|
||||
## Source code view
|
||||
|
||||
pgxn/neon/communicator_process.c
|
||||
Contains code needed to start up the communicator process, and
|
||||
the glue that interacts with PostgreSQL code and the Rust
|
||||
code in the communicator process.
|
||||
|
||||
|
||||
pgxn/neon/communicator/src/worker_process/
|
||||
Worker process main loop and glue code
|
||||
|
||||
At compilation time, pgxn/neon/communicator/ produces a static
|
||||
library, libcommunicator.a. It is linked to the neon.so extension
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/// dummy function, just to test linking Rust functions into the C
|
||||
/// extension
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_dummy(arg: u32) -> u32 {
|
||||
arg + 1
|
||||
}
|
||||
mod worker_process;
|
||||
|
||||
/// Name of the Unix Domain Socket that serves the metrics, and other APIs in the
|
||||
/// future. This is within the Postgres data directory.
|
||||
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
|
||||
|
||||
51
pgxn/neon/communicator/src/worker_process/callbacks.rs
Normal file
51
pgxn/neon/communicator/src/worker_process/callbacks.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These
|
||||
//! are implemented in `neon/pgxn/communicator_process.c`. The function signatures better
|
||||
//! match!
|
||||
//!
|
||||
//! These are called from the communicator threads! Careful what you do, most Postgres
|
||||
//! functions are not safe to call in that context.
|
||||
|
||||
#[cfg(not(test))]
|
||||
unsafe extern "C" {
|
||||
pub fn callback_set_my_latch_unsafe();
|
||||
pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics;
|
||||
}
|
||||
|
||||
// Compile unit tests with dummy versions of the functions. Unit tests cannot call back
|
||||
// into the C code. (As of this writing, no unit tests even exists in the communicator
|
||||
// package, but the code coverage build still builds these and tries to link with the
|
||||
// external C code.)
|
||||
#[cfg(test)]
|
||||
unsafe fn callback_set_my_latch_unsafe() {
|
||||
panic!("not usable in unit tests");
|
||||
}
|
||||
#[cfg(test)]
|
||||
unsafe fn callback_get_lfc_metrics_unsafe() -> LfcMetrics {
|
||||
panic!("not usable in unit tests");
|
||||
}
|
||||
|
||||
// safe wrappers
|
||||
|
||||
pub(super) fn callback_set_my_latch() {
|
||||
unsafe { callback_set_my_latch_unsafe() };
|
||||
}
|
||||
|
||||
pub(super) fn callback_get_lfc_metrics() -> LfcMetrics {
|
||||
unsafe { callback_get_lfc_metrics_unsafe() }
|
||||
}
|
||||
|
||||
/// Return type of the callback_get_lfc_metrics() function.
|
||||
#[repr(C)]
|
||||
pub struct LfcMetrics {
|
||||
pub lfc_cache_size_limit: i64,
|
||||
pub lfc_hits: i64,
|
||||
pub lfc_misses: i64,
|
||||
pub lfc_used: i64,
|
||||
pub lfc_writes: i64,
|
||||
|
||||
// working set size looking back 1..60 minutes.
|
||||
//
|
||||
// Index 0 is the size of the working set accessed within last 1 minute,
|
||||
// index 59 is the size of the working set accessed within last 60 minutes.
|
||||
pub lfc_approximate_working_set_size_windows: [i64; 60],
|
||||
}
|
||||
102
pgxn/neon/communicator/src/worker_process/control_socket.rs
Normal file
102
pgxn/neon/communicator/src/worker_process/control_socket.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
//! Communicator control socket.
|
||||
//!
|
||||
//! Currently, the control socket is used to provide information about the communicator
|
||||
//! process, file cache etc. as prometheus metrics. In the future, it can be used to
|
||||
//! expose more things.
|
||||
//!
|
||||
//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
|
||||
//! data directory. For debugging, you can access it with curl:
|
||||
//!
|
||||
//! ```sh
|
||||
//! curl --unix-socket neon-communicator.socket http://localhost/metrics
|
||||
//! ```
|
||||
//!
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::response::Response;
|
||||
use http::StatusCode;
|
||||
use http::header::CONTENT_TYPE;
|
||||
|
||||
use measured::MetricGroup;
|
||||
use measured::text::BufferedTextEncoder;
|
||||
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
use crate::NEON_COMMUNICATOR_SOCKET_NAME;
|
||||
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
|
||||
|
||||
impl CommunicatorWorkerProcessStruct {
|
||||
/// Launch the listener
|
||||
pub(crate) async fn launch_control_socket_listener(
|
||||
&'static self,
|
||||
) -> Result<(), std::io::Error> {
|
||||
use axum::routing::get;
|
||||
let app = Router::new()
|
||||
.route("/metrics", get(get_metrics))
|
||||
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
|
||||
.route("/debug/panic", get(handle_debug_panic))
|
||||
.with_state(self);
|
||||
|
||||
// If the server is restarted, there might be an old socket still
|
||||
// lying around. Remove it first.
|
||||
match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
|
||||
Ok(()) => {
|
||||
tracing::warn!("removed stale control socket");
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::error!("could not remove stale control socket: {e:#}");
|
||||
// Try to proceed anyway. It will likely fail below though.
|
||||
}
|
||||
};
|
||||
|
||||
// Create the unix domain socket and start listening on it
|
||||
let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
|
||||
|
||||
tokio::spawn(async {
|
||||
tracing::info!("control socket listener spawned");
|
||||
axum::serve(listener, app)
|
||||
.await
|
||||
.expect("axum::serve never returns")
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Expose all Prometheus metrics.
|
||||
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
|
||||
tracing::trace!("/metrics requested");
|
||||
metrics_to_response(&state).await
|
||||
}
|
||||
|
||||
/// Expose Prometheus metrics, for use by the autoscaling agent.
|
||||
///
|
||||
/// This is a subset of all the metrics.
|
||||
async fn get_autoscaling_metrics(
|
||||
State(state): State<&CommunicatorWorkerProcessStruct>,
|
||||
) -> Response {
|
||||
tracing::trace!("/metrics requested");
|
||||
metrics_to_response(&state.lfc_metrics).await
|
||||
}
|
||||
|
||||
async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
|
||||
panic!("test HTTP handler task panic");
|
||||
}
|
||||
|
||||
/// Helper function to convert prometheus metrics to a text response
|
||||
async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
|
||||
let mut enc = BufferedTextEncoder::new();
|
||||
metrics
|
||||
.collect_group_into(&mut enc)
|
||||
.unwrap_or_else(|never| match never {});
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, "application/text")
|
||||
.body(Body::from(enc.finish()))
|
||||
.unwrap()
|
||||
}
|
||||
83
pgxn/neon/communicator/src/worker_process/lfc_metrics.rs
Normal file
83
pgxn/neon/communicator/src/worker_process/lfc_metrics.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use measured::{
|
||||
FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup,
|
||||
label::{LabelName, LabelValue, StaticLabelSet},
|
||||
metric::{MetricEncoding, gauge::GaugeState, group::Encoding},
|
||||
};
|
||||
|
||||
use super::callbacks::callback_get_lfc_metrics;
|
||||
|
||||
pub(crate) struct LfcMetricsCollector;
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new())]
|
||||
struct LfcMetricsGroup {
|
||||
/// LFC cache size limit in bytes
|
||||
lfc_cache_size_limit: Gauge,
|
||||
/// LFC cache hits
|
||||
lfc_hits: Gauge,
|
||||
/// LFC cache misses
|
||||
lfc_misses: Gauge,
|
||||
/// LFC chunks used (chunk = 1MB)
|
||||
lfc_used: Gauge,
|
||||
/// LFC cache writes
|
||||
lfc_writes: Gauge,
|
||||
/// Approximate working set size in pages of 8192 bytes
|
||||
#[metric(init = GaugeVec::dense())]
|
||||
lfc_approximate_working_set_size_windows: GaugeVec<StaticLabelSet<MinuteAsSeconds>>,
|
||||
}
|
||||
|
||||
impl<T: Encoding> MetricGroup<T> for LfcMetricsCollector
|
||||
where
|
||||
GaugeState: MetricEncoding<T>,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as Encoding>::Err> {
|
||||
let g = LfcMetricsGroup::new();
|
||||
|
||||
let lfc_metrics = callback_get_lfc_metrics();
|
||||
|
||||
g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit);
|
||||
g.lfc_hits.set(lfc_metrics.lfc_hits);
|
||||
g.lfc_misses.set(lfc_metrics.lfc_misses);
|
||||
g.lfc_used.set(lfc_metrics.lfc_used);
|
||||
g.lfc_writes.set(lfc_metrics.lfc_writes);
|
||||
|
||||
for i in 0..60 {
|
||||
let val = lfc_metrics.lfc_approximate_working_set_size_windows[i];
|
||||
g.lfc_approximate_working_set_size_windows
|
||||
.set(MinuteAsSeconds(i), val);
|
||||
}
|
||||
|
||||
g.collect_group_into(enc)
|
||||
}
|
||||
}
|
||||
|
||||
/// This stores the values in range 0..60,
|
||||
/// encodes them as seconds (60, 120, 180, ..., 3600)
|
||||
#[derive(Clone, Copy)]
|
||||
struct MinuteAsSeconds(usize);
|
||||
|
||||
impl FixedCardinalityLabel for MinuteAsSeconds {
|
||||
fn cardinality() -> usize {
|
||||
60
|
||||
}
|
||||
|
||||
fn encode(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
|
||||
fn decode(value: usize) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelValue for MinuteAsSeconds {
|
||||
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
|
||||
v.write_int((self.0 + 1) as i64 * 60)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroup for MinuteAsSeconds {
|
||||
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
|
||||
v.write_value(LabelName::from_str("duration_seconds"), self);
|
||||
}
|
||||
}
|
||||
250
pgxn/neon/communicator/src/worker_process/logging.rs
Normal file
250
pgxn/neon/communicator/src/worker_process/logging.rs
Normal file
@@ -0,0 +1,250 @@
|
||||
//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
|
||||
//!
|
||||
//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
|
||||
//! process latch is raised. That wakes up the loop in the main thread, see
|
||||
//! `communicator_new_bgworker_main()`. It reads the message from the channel and
|
||||
//! ereport()s it. This ensures that only one thread, the main thread, calls the
|
||||
//! PostgreSQL logging routines at any time.
|
||||
|
||||
use std::ffi::c_char;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::mpsc::{Receiver, SyncSender};
|
||||
use std::sync::mpsc::{TryRecvError, TrySendError};
|
||||
|
||||
use tracing::info;
|
||||
use tracing::{Event, Level, Metadata, Subscriber};
|
||||
use tracing_subscriber::filter::LevelFilter;
|
||||
use tracing_subscriber::fmt::format::Writer;
|
||||
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
use crate::worker_process::callbacks::callback_set_my_latch;
|
||||
|
||||
/// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`]
|
||||
pub struct LoggingReceiver {
|
||||
receiver: Receiver<FormattedEventWithMeta>,
|
||||
}
|
||||
|
||||
/// This is passed to `tracing`
|
||||
struct LoggingSender {
|
||||
sender: SyncSender<FormattedEventWithMeta>,
|
||||
}
|
||||
|
||||
static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// Called once, at worker process startup. The returned LoggingState is passed back
|
||||
/// in the subsequent calls to `pump_logging`. It is opaque to the C code.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_configure_logging() -> Box<LoggingReceiver> {
|
||||
let (sender, receiver) = sync_channel(1000);
|
||||
|
||||
let receiver = LoggingReceiver { receiver };
|
||||
let sender = LoggingSender { sender };
|
||||
|
||||
use tracing_subscriber::prelude::*;
|
||||
let r = tracing_subscriber::registry();
|
||||
|
||||
let r = r.with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.event_format(SimpleFormatter)
|
||||
.with_writer(sender)
|
||||
// TODO: derive this from log_min_messages? Currently the code in
|
||||
// communicator_process.c forces log_min_messages='INFO'.
|
||||
.with_filter(LevelFilter::from_level(Level::INFO)),
|
||||
);
|
||||
r.init();
|
||||
|
||||
info!("communicator process logging started");
|
||||
|
||||
Box::new(receiver)
|
||||
}
|
||||
|
||||
/// Read one message from the logging queue. This is essentially a wrapper to Receiver,
|
||||
/// with a C-friendly signature.
|
||||
///
|
||||
/// The message is copied into *errbuf, which is a caller-supplied buffer of size
|
||||
/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always
|
||||
/// NULL-terminated.
|
||||
///
|
||||
/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
|
||||
/// elog.h
|
||||
///
|
||||
/// If there was a message, *dropped_event_count_p is also updated with a counter of how
|
||||
/// many log messages in total has been dropped. By comparing that with the value from
|
||||
/// previous call, you can tell how many were dropped since last call.
|
||||
///
|
||||
/// Returns:
|
||||
///
|
||||
/// 0 if there were no messages
|
||||
/// 1 if there was a message. The message and its level are returned in
|
||||
/// *errbuf and *elevel_p. *dropped_event_count_p is also updated.
|
||||
/// -1 on error, i.e the other end of the queue was disconnected
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_poll_logging(
|
||||
state: &mut LoggingReceiver,
|
||||
errbuf: *mut c_char,
|
||||
errbuf_len: u32,
|
||||
elevel_p: &mut i32,
|
||||
dropped_event_count_p: &mut u64,
|
||||
) -> i32 {
|
||||
let msg = match state.receiver.try_recv() {
|
||||
Err(TryRecvError::Empty) => return 0,
|
||||
Err(TryRecvError::Disconnected) => return -1,
|
||||
Ok(msg) => msg,
|
||||
};
|
||||
|
||||
let src: &[u8] = &msg.message;
|
||||
let dst: *mut u8 = errbuf.cast();
|
||||
let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
|
||||
*(dst.add(len)) = b'\0'; // NULL terminator
|
||||
}
|
||||
|
||||
// Map the tracing Level to PostgreSQL elevel.
|
||||
//
|
||||
// XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to
|
||||
// hide these?
|
||||
*elevel_p = match msg.level {
|
||||
Level::TRACE => 10, // DEBUG5
|
||||
Level::DEBUG => 14, // DEBUG1
|
||||
Level::INFO => 17, // INFO
|
||||
Level::WARN => 19, // WARNING
|
||||
Level::ERROR => 21, // ERROR
|
||||
};
|
||||
|
||||
*dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed);
|
||||
|
||||
1
|
||||
}
|
||||
|
||||
//---- The following functions can be called from any thread ----
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FormattedEventWithMeta {
|
||||
message: Vec<u8>,
|
||||
level: tracing::Level,
|
||||
}
|
||||
|
||||
impl Default for FormattedEventWithMeta {
|
||||
fn default() -> Self {
|
||||
FormattedEventWithMeta {
|
||||
message: Vec::new(),
|
||||
level: tracing::Level::DEBUG,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct EventBuilder<'a> {
|
||||
event: FormattedEventWithMeta,
|
||||
|
||||
sender: &'a LoggingSender,
|
||||
}
|
||||
|
||||
impl std::io::Write for EventBuilder<'_> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.event.message.write(buf)
|
||||
}
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.sender.send_event(self.event.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EventBuilder<'_> {
|
||||
fn drop(&mut self) {
|
||||
let sender = self.sender;
|
||||
let event = std::mem::take(&mut self.event);
|
||||
|
||||
sender.send_event(event);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MakeWriter<'a> for LoggingSender {
|
||||
type Writer = EventBuilder<'a>;
|
||||
|
||||
fn make_writer(&'a self) -> Self::Writer {
|
||||
panic!("not expected to be called when make_writer_for is implemented");
|
||||
}
|
||||
|
||||
fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
|
||||
EventBuilder {
|
||||
event: FormattedEventWithMeta {
|
||||
message: Vec::new(),
|
||||
level: *meta.level(),
|
||||
},
|
||||
sender: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LoggingSender {
|
||||
fn send_event(&self, e: FormattedEventWithMeta) {
|
||||
match self.sender.try_send(e) {
|
||||
Ok(()) => {
|
||||
// notify the main thread
|
||||
callback_set_my_latch();
|
||||
}
|
||||
Err(TrySendError::Disconnected(_)) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
// The queue is full, cannot send any more. To avoid blocking the tokio
|
||||
// thread, simply drop the message. Better to lose some logs than get
|
||||
// stuck if there's a problem with the logging.
|
||||
//
|
||||
// Record the fact that was a message was dropped by incrementing the
|
||||
// counter.
|
||||
DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple formatter implementation for tracing_subscriber, which prints the log spans and
|
||||
/// message part like the default formatter, but no timestamp or error level. The error
|
||||
/// level is captured separately by `FormattedEventWithMeta', and when the error is
|
||||
/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
|
||||
/// point. (The timestamp printed will therefore lag behind the timestamp on the event
|
||||
/// here, if the main thread doesn't process the log message promptly)
|
||||
struct SimpleFormatter;
|
||||
|
||||
impl<S, N> FormatEvent<S, N> for SimpleFormatter
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
N: for<'a> FormatFields<'a> + 'static,
|
||||
{
|
||||
fn format_event(
|
||||
&self,
|
||||
ctx: &FmtContext<'_, S, N>,
|
||||
mut writer: Writer<'_>,
|
||||
event: &Event<'_>,
|
||||
) -> std::fmt::Result {
|
||||
// Format all the spans in the event's span context.
|
||||
if let Some(scope) = ctx.event_scope() {
|
||||
for span in scope.from_root() {
|
||||
write!(writer, "{}", span.name())?;
|
||||
|
||||
// `FormattedFields` is a formatted representation of the span's fields,
|
||||
// which is stored in its extensions by the `fmt` layer's `new_span`
|
||||
// method. The fields will have been formatted by the same field formatter
|
||||
// that's provided to the event formatter in the `FmtContext`.
|
||||
let ext = span.extensions();
|
||||
let fields = &ext
|
||||
.get::<FormattedFields<N>>()
|
||||
.expect("will never be `None`");
|
||||
|
||||
// Skip formatting the fields if the span had no fields.
|
||||
if !fields.is_empty() {
|
||||
write!(writer, "{{{fields}}}")?;
|
||||
}
|
||||
write!(writer, ": ")?;
|
||||
}
|
||||
}
|
||||
|
||||
// Write fields on the event
|
||||
ctx.field_format().format_fields(writer.by_ref(), event)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
66
pgxn/neon/communicator/src/worker_process/main_loop.rs
Normal file
66
pgxn/neon/communicator/src/worker_process/main_loop.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
use std::str::FromStr as _;
|
||||
|
||||
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
|
||||
|
||||
use measured::MetricGroup;
|
||||
use measured::metric::MetricEncoding;
|
||||
use measured::metric::gauge::GaugeState;
|
||||
use measured::metric::group::Encoding;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct CommunicatorWorkerProcessStruct {
|
||||
runtime: tokio::runtime::Runtime,
|
||||
|
||||
/*** Metrics ***/
|
||||
pub(crate) lfc_metrics: LfcMetricsCollector,
|
||||
}
|
||||
|
||||
/// Launch the communicator process's Rust subsystems
|
||||
pub(super) fn init(
|
||||
tenant_id: Option<&str>,
|
||||
timeline_id: Option<&str>,
|
||||
) -> Result<&'static CommunicatorWorkerProcessStruct, String> {
|
||||
// The caller validated these already
|
||||
let _tenant_id = tenant_id
|
||||
.map(TenantId::from_str)
|
||||
.transpose()
|
||||
.map_err(|e| format!("invalid tenant ID: {e}"))?;
|
||||
let _timeline_id = timeline_id
|
||||
.map(TimelineId::from_str)
|
||||
.transpose()
|
||||
.map_err(|e| format!("invalid timeline ID: {e}"))?;
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("communicator thread")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let worker_struct = CommunicatorWorkerProcessStruct {
|
||||
// Note: it's important to not drop the runtime, or all the tasks are dropped
|
||||
// too. Including it in the returned struct is one way to keep it around.
|
||||
runtime,
|
||||
|
||||
// metrics
|
||||
lfc_metrics: LfcMetricsCollector,
|
||||
};
|
||||
let worker_struct = Box::leak(Box::new(worker_struct));
|
||||
|
||||
// Start the listener on the control socket
|
||||
worker_struct
|
||||
.runtime
|
||||
.block_on(worker_struct.launch_control_socket_listener())
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(worker_struct)
|
||||
}
|
||||
|
||||
impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct
|
||||
where
|
||||
T: Encoding,
|
||||
GaugeState: MetricEncoding<T>,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
|
||||
self.lfc_metrics.collect_group_into(enc)
|
||||
}
|
||||
}
|
||||
13
pgxn/neon/communicator/src/worker_process/mod.rs
Normal file
13
pgxn/neon/communicator/src/worker_process/mod.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
//! This code runs in the communicator worker process. This provides
|
||||
//! the glue code to:
|
||||
//!
|
||||
//! - launch the main loop,
|
||||
//! - receive IO requests from backends and process them,
|
||||
//! - write results back to backends.
|
||||
|
||||
mod callbacks;
|
||||
mod control_socket;
|
||||
mod lfc_metrics;
|
||||
mod logging;
|
||||
mod main_loop;
|
||||
mod worker_interface;
|
||||
@@ -0,0 +1,60 @@
|
||||
//! Functions called from the C code in the worker process
|
||||
|
||||
use std::ffi::{CStr, CString, c_char};
|
||||
|
||||
use crate::worker_process::main_loop;
|
||||
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
|
||||
|
||||
/// Launch the communicator's tokio tasks, which do most of the work.
|
||||
///
|
||||
/// The caller has initialized the process as a regular PostgreSQL background worker
|
||||
/// process.
|
||||
///
|
||||
/// Inputs:
|
||||
/// `tenant_id` and `timeline_id` can be NULL, if we're been launched in "non-Neon" mode,
|
||||
/// where we use local storage instead of connecting to remote neon storage. That's
|
||||
/// currently only used in some unit tests.
|
||||
///
|
||||
/// Result:
|
||||
/// Returns pointer to CommunicatorWorkerProcessStruct, which is a handle to running
|
||||
/// Rust tasks. The C code can use it to interact with the Rust parts. On failure, returns
|
||||
/// None/NULL, and an error message is returned in *error_p
|
||||
///
|
||||
/// This is called only once in the process, so the returned struct, and error message in
|
||||
/// case of failure, are simply leaked.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_launch(
|
||||
tenant_id: *const c_char,
|
||||
timeline_id: *const c_char,
|
||||
error_p: *mut *const c_char,
|
||||
) -> Option<&'static CommunicatorWorkerProcessStruct> {
|
||||
// Convert the arguments into more convenient Rust types
|
||||
let tenant_id = if tenant_id.is_null() {
|
||||
None
|
||||
} else {
|
||||
let cstr = unsafe { CStr::from_ptr(tenant_id) };
|
||||
Some(cstr.to_str().expect("assume UTF-8"))
|
||||
};
|
||||
let timeline_id = if timeline_id.is_null() {
|
||||
None
|
||||
} else {
|
||||
let cstr = unsafe { CStr::from_ptr(timeline_id) };
|
||||
Some(cstr.to_str().expect("assume UTF-8"))
|
||||
};
|
||||
|
||||
// The `init` function does all the work.
|
||||
let result = main_loop::init(tenant_id, timeline_id);
|
||||
|
||||
// On failure, return the error message to the C caller in *error_p.
|
||||
match result {
|
||||
Ok(worker_struct) => Some(worker_struct),
|
||||
Err(errmsg) => {
|
||||
let errmsg = CString::new(errmsg).expect("no nuls within error message");
|
||||
let errmsg = Box::leak(errmsg.into_boxed_c_str());
|
||||
let p: *const c_char = errmsg.as_ptr();
|
||||
|
||||
unsafe { *error_p = p };
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
273
pgxn/neon/communicator_process.c
Normal file
273
pgxn/neon/communicator_process.c
Normal file
@@ -0,0 +1,273 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* communicator_process.c
|
||||
* Functions for starting up the communicator background worker process.
|
||||
*
|
||||
* Currently, the communicator process only functions as a metrics
|
||||
* exporter. It provides an HTTP endpoint for polling a limited set of
|
||||
* metrics. TODO: In the future, it will do much more, i.e. handle all
|
||||
* the communications with the pageservers.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "communicator_process.h"
|
||||
#include "file_cache.h"
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
/* the rust bindings, generated by cbindgen */
|
||||
#include "communicator/communicator_bindings.h"
|
||||
|
||||
static void pump_logging(struct LoggingReceiver *logging);
|
||||
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
|
||||
|
||||
/**** Initialization functions. These run in postmaster ****/
|
||||
|
||||
void
|
||||
pg_init_communicator_process(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
/* Initialize the background worker process */
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
/**** Worker process functions. These run in the communicator worker process ****/
|
||||
|
||||
/*
|
||||
* Entry point for the communicator bgworker process
|
||||
*/
|
||||
void
|
||||
communicator_new_bgworker_main(Datum main_arg)
|
||||
{
|
||||
struct LoggingReceiver *logging;
|
||||
const char *errmsg = NULL;
|
||||
const struct CommunicatorWorkerProcessStruct *proc_handle;
|
||||
|
||||
/*
|
||||
* Pretend that this process is a WAL sender. That affects the shutdown
|
||||
* sequence: WAL senders are shut down last, after the final checkpoint
|
||||
* has been written. That's what we want for the communicator process too.
|
||||
*/
|
||||
am_walsender = true;
|
||||
MarkPostmasterChildWalSender();
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
/*
|
||||
* Postmaster sends us SIGUSR2 when all regular backends and bgworkers
|
||||
* have exited, and it's time for us to exit too
|
||||
*/
|
||||
pqsignal(SIGUSR2, die);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/*
|
||||
* By default, INFO messages are not printed to the log. We want
|
||||
* `tracing::info!` messages emitted from the communicator to be printed,
|
||||
* however, so increase the log level.
|
||||
*
|
||||
* XXX: This overrides any user-set value from the config file. That's not
|
||||
* great, but on the other hand, there should be little reason for user to
|
||||
* control the verbosity of the communicator. It's not too verbose by
|
||||
* default.
|
||||
*/
|
||||
SetConfigOption("log_min_messages", "INFO", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
|
||||
logging = communicator_worker_configure_logging();
|
||||
|
||||
proc_handle = communicator_worker_launch(
|
||||
neon_tenant[0] == '\0' ? NULL : neon_tenant,
|
||||
neon_timeline[0] == '\0' ? NULL : neon_timeline,
|
||||
&errmsg
|
||||
);
|
||||
if (proc_handle == NULL)
|
||||
{
|
||||
/*
|
||||
* Something went wrong. Before exiting, forward any log messages that
|
||||
* might've been generated during the failed launch.
|
||||
*/
|
||||
pump_logging(logging);
|
||||
|
||||
elog(PANIC, "%s", errmsg);
|
||||
}
|
||||
|
||||
/*
|
||||
* The Rust tokio runtime has been launched, and it's running in the
|
||||
* background now. This loop in the main thread handles any interactions
|
||||
* we need with the rest of PostgreSQL.
|
||||
*
|
||||
* NB: This process is now multi-threaded! The Rust threads do not call
|
||||
* into any Postgres functions, but it's not entirely clear which Postgres
|
||||
* functions are safe to call from this main thread either. Be very
|
||||
* careful about adding anything non-trivial here.
|
||||
*
|
||||
* Also note that we try to react quickly to any log messages arriving
|
||||
* from the Rust thread. Be careful to not do anything too expensive here
|
||||
* that might cause delays.
|
||||
*/
|
||||
elog(LOG, "communicator threads started");
|
||||
for (;;)
|
||||
{
|
||||
TimestampTz before;
|
||||
long duration;
|
||||
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/*
|
||||
* Forward any log messages from the Rust threads into the normal
|
||||
* Postgres logging facility.
|
||||
*/
|
||||
pump_logging(logging);
|
||||
|
||||
/*
|
||||
* Check interrupts like system shutdown or config reload
|
||||
*
|
||||
* We mustn't block for too long within this loop, or we risk the log
|
||||
* queue to fill up and messages to be lost. Also, even if we can keep
|
||||
* up, if there's a long delay between sending a message and printing
|
||||
* it to the log, the timestamps on the messages get skewed, which is
|
||||
* confusing.
|
||||
*
|
||||
* We expect processing interrupts to happen fast enough that it's OK,
|
||||
* but measure it just in case, and print a warning if it takes longer
|
||||
* than 100 ms.
|
||||
*/
|
||||
#define LOG_SKEW_WARNING_MS 100
|
||||
before = GetCurrentTimestamp();
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
if (ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());
|
||||
if (duration > LOG_SKEW_WARNING_MS)
|
||||
elog(WARNING, "handling interrupts took %ld ms, communicator log timestamps might be skewed", duration);
|
||||
|
||||
/*
|
||||
* Wait until we are woken up. The rust threads will set the latch
|
||||
* when there's a log message to forward.
|
||||
*/
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
pump_logging(struct LoggingReceiver *logging)
|
||||
{
|
||||
char errbuf[1000];
|
||||
int elevel;
|
||||
int32 rc;
|
||||
static uint64_t last_dropped_event_count = 0;
|
||||
uint64_t dropped_event_count;
|
||||
uint64_t dropped_now;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
rc = communicator_worker_poll_logging(logging,
|
||||
errbuf,
|
||||
sizeof(errbuf),
|
||||
&elevel,
|
||||
&dropped_event_count);
|
||||
if (rc == 0)
|
||||
{
|
||||
/* nothing to do */
|
||||
break;
|
||||
}
|
||||
else if (rc == 1)
|
||||
{
|
||||
/* Because we don't want to exit on error */
|
||||
|
||||
if (message_level_is_interesting(elevel))
|
||||
{
|
||||
/*
|
||||
* Prevent interrupts while cleaning up.
|
||||
*
|
||||
* (Not sure if this is required, but all the error handlers
|
||||
* in Postgres that are installed as sigsetjmp() targets do
|
||||
* this, so let's follow the example)
|
||||
*/
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
errstart(elevel, TEXTDOMAIN);
|
||||
errmsg_internal("[COMMUNICATOR] %s", errbuf);
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
|
||||
/* Now we can allow interrupts again */
|
||||
RESUME_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
else if (rc == -1)
|
||||
{
|
||||
elog(ERROR, "logging channel was closed unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the queue was full at any time since the last time we reported it,
|
||||
* report how many messages were lost. We do this outside the loop, so
|
||||
* that if the logging system is clogged, we don't exacerbate it by
|
||||
* printing lots of warnings about dropped messages.
|
||||
*/
|
||||
dropped_now = dropped_event_count - last_dropped_event_count;
|
||||
if (dropped_now != 0)
|
||||
{
|
||||
elog(WARNING, "%lu communicator log messages were dropped because the log buffer was full",
|
||||
(unsigned long) dropped_now);
|
||||
last_dropped_event_count = dropped_event_count;
|
||||
}
|
||||
}
|
||||
|
||||
/****
|
||||
* Callbacks from the rust code, in the communicator process.
|
||||
*
|
||||
* NOTE: These must be thread-safe! It's very limited which PostgreSQL
|
||||
* functions you can use!!!
|
||||
*
|
||||
* The signatures of these need to match those in the Rust code.
|
||||
*/
|
||||
|
||||
void
|
||||
callback_set_my_latch_unsafe(void)
|
||||
{
|
||||
SetLatch(MyLatch);
|
||||
}
|
||||
17
pgxn/neon/communicator_process.h
Normal file
17
pgxn/neon/communicator_process.h
Normal file
@@ -0,0 +1,17 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* communicator_process.h
|
||||
* Communicator process
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef COMMUNICATOR_PROCESS_H
|
||||
#define COMMUNICATOR_PROCESS_H
|
||||
|
||||
extern void pg_init_communicator_process(void);
|
||||
|
||||
#endif /* COMMUNICATOR_PROCESS_H */
|
||||
@@ -52,6 +52,8 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "communicator.h"
|
||||
|
||||
#include "communicator/communicator_bindings.h"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
@@ -2156,6 +2158,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
|
||||
return dc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get metrics, for the built-in metrics exporter that's part of the communicator
|
||||
* process.
|
||||
*
|
||||
* NB: This is called from a Rust tokio task inside the communicator process.
|
||||
* Acquiring lwlocks, elog(), allocating memory or anything else non-trivial
|
||||
* is strictly prohibited here!
|
||||
*/
|
||||
struct LfcMetrics
|
||||
callback_get_lfc_metrics_unsafe(void)
|
||||
{
|
||||
struct LfcMetrics result = {
|
||||
.lfc_cache_size_limit = (int64) lfc_size_limit * 1024 * 1024,
|
||||
.lfc_hits = lfc_ctl ? lfc_ctl->hits : 0,
|
||||
.lfc_misses = lfc_ctl ? lfc_ctl->misses : 0,
|
||||
.lfc_used = lfc_ctl ? lfc_ctl->used : 0,
|
||||
.lfc_writes = lfc_ctl ? lfc_ctl->writes : 0,
|
||||
};
|
||||
|
||||
if (lfc_ctl)
|
||||
{
|
||||
for (int minutes = 1; minutes <= 60; minutes++)
|
||||
{
|
||||
result.lfc_approximate_working_set_size_windows[minutes - 1] =
|
||||
lfc_approximate_working_set_size_seconds(minutes * 60, false);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_local_cache_state);
|
||||
|
||||
Datum
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user