Compare commits

..

23 Commits

Author SHA1 Message Date
Alex Chi Z
c939110d0a test(storcon): add test cases for 404 passthrough
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-16 17:19:40 -04:00
Alex Chi Z
7d4eb50d48 404 if no tenant if found
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-16 17:07:49 -04:00
Alex Chi Z
77271bca07 fix errors
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-16 14:05:39 -04:00
Alex Chi Z
cad28d273e fix(storcon): only convert 404 when tenant not found
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-16 14:00:56 -04:00
Krzysztof Szafrański
e2982ed3ec [proxy] Cache node info only for TTL, even if Redis is available (#12626)
This PR simplifies our node info cache. Now we'll store entries for at
most the TTL duration, even if Redis notifications are available. This
will allow us to cache intermittent errors later (e.g. due to rate
limits) with more predictable behavior.

Related to https://github.com/neondatabase/cloud/issues/19353
2025-07-16 16:23:05 +00:00
Tristan Partin
9e154a8130 PG: smooth max wal rate (#12514)
## Problem
We were only resetting the limit in the wal proposer. If backends are
back pressured, it might take a while for the wal proposer to receive a
new WAL to reset the limit.

## Summary of changes
Backend also checks the time and resets the limit.

## How is this tested?
pgbench has more smooth tps

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com>
2025-07-16 16:11:25 +00:00
JC Grünhage
79d72c94e8 reformat cargo install invocations in build-tools image (#12629)
## Problem
Same change with different formatting happened in multiple branches.

## Summary of changes
Realign formatting with the other branch.
2025-07-16 16:02:07 +00:00
Alex Chi Z.
80e5771c67 fix(storcon): passthrough 404 as 503 during migrations (#12620)
## Problem

close LKB-270, close LKB-253

We periodically saw pageserver returns 404 -> storcon converts it to 500
to cplane, and causing branch operations fail. This is due to storcon is
migrating tenants across pageservers and the request was forwarded from
the storcon to pageservers while the tenant was not attached yet. Such
operations should be retried from cplane and storcon should return 503
in such cases.

## Summary of changes

- Refactor `tenant_timeline_lsn_lease` to have a single function process
and passthrough such requests: `collect_tenant_shards` for collecting
all shards and checking if they're consistent with the observed state,
`process_result_and_passthrough_errors` to convert 404 into 503 if
necessary.
- `tenant_shard_node` also checks observed state now.

Note that for passthrough shard0, we originally had a check to convert
404 to 503:

```
    // Transform 404 into 503 if we raced with a migration
    if resp.status() == reqwest::StatusCode::NOT_FOUND {
        // Look up node again: if we migrated it will be different
        let new_node = service.tenant_shard_node(tenant_shard_id).await?;
        if new_node.get_id() != node.get_id() {
            // Rather than retry here, send the client a 503 to prompt a retry: this matches
            // the pageserver's use of 503, and all clients calling this API should retry on 503.
            return Err(ApiError::ResourceUnavailable(
                format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
            ));
        }
    }
```

However, this only checks the intent state. It is possible that the
migration is in progress before/after the request is processed and
intent state is always the same throughout the API call, therefore 404
not being processed by this branch.

Also, not sure about if this new code is correct or not, need second
eyes on that:

```
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
Ok((node.clone(), false))
```

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-16 15:51:20 +00:00
Aleksandr Sarantsev
1178f6fe7c pageserver: Downgrade log level of 'No broker updates' (#12627)
## Problem

The warning message was seen during deployment, but it's actually OK.

## Summary of changes

- Treat `"No broker updates received for a while ..."` as an info
message.

Co-authored-by: Aleksandr Sarantsev <aleksandr.sarantsev@databricks.com>
2025-07-16 15:02:01 +00:00
Vlad Lazar
8b18d8b31b safekeeper: add global disk usage utilization limit (#12605)
N.B: No-op for the neon-env.

## Problem

We added a per-timeline disk utilization protection circuit breaker,
which will stop the safekeeper from accepting more WAL writes if the
disk utilization by the timeline has exceeded a configured limit. We
mainly designed the mechanism as a guard against WAL upload/backup bugs,
and we assumed that as long as WAL uploads are proceeding as normal we
will not run into disk pressure. This turned out to be not true. In one
of our load tests where we have 500 PGs ingesting data at the same time,
safekeeper disk utilization started to creep up even though WAL uploads
were completely normal (we likely just maxed out our S3 upload bandwidth
from the single SK). This means the per-timeline disk utilization
protection won't be enough if too many timelines are ingesting data at
the same time.

## Summary of changes

Added a global disk utilization protection circuit breaker which will
stop a safekeeper from accepting more WAL writes if the total disk usage
on the safekeeper (across all tenants) exceeds a limit. We implemented
this circuit breaker through two parts:

1. A "global disk usage watcher" background task that runs at a
configured interval (default every minute) to see how much disk space is
being used in the safekeeper's filesystem. This background task also
performs the check against the limit and publishes the result to a
global atomic boolean flag.
2. The `hadron_check_disk_usage()` routine (in `timeline.rs`) now also
checks this global boolean flag published in the step above, and fails
the `WalAcceptor` (triggers the circuit breaker) if the flag was raised.

The disk usage limit is disabled by default.
It can be tuned with the `--max-global-disk-usage-ratio` CLI arg.

## How is this tested?

Added integration test
`test_wal_acceptor.py::test_global_disk_usage_limit`.

Also noticed that I haven't been using the `wait_until(f)` test function
correctly (the `f` passed in is supposed to raise an exception if the
condition is not met, instead of returning `False`...). Fixed it in both
circuit breaker tests.

---------

Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-16 14:43:17 +00:00
Vlad Lazar
3e4cbaed67 storcon: validate intent state before applying optimization (#12593)
## Problem

In the gap between picking an optimization and applying it, something
might insert a change to the intent state that makes it incompatible.
If the change is done via the `schedule()` method, we are covered by the
increased sequence number, but otherwise we can panic if we violate the
intent state invariants.

## Summary of Changes

Validate the optimization right before applying it. Since we hold the
service lock at that point, nothing else can sneak in.

Closes LKB-65
2025-07-16 14:37:40 +00:00
Conrad Ludgate
c71aea0223 proxy: for json logging, only use callsite IDs if span name is duplicated (#12625)
## Problem

We run multiple proxies, we get logs like

```
... spans={"http_conn#22":{"conn_id": ...
... spans={"http_conn#24":{"conn_id": ...
```

these are the same span, and the difference is confusing.

## Summary of changes

Introduce a counter per span name, rather than a global counter. If the
counter is 0, no change to the span name is made.

To follow up: see which span names are duplicated within the codebase in
different callsites
2025-07-16 13:29:18 +00:00
Conrad Ludgate
87915df2fa proxy: replace serde_json with our new json ser crate in the logging impl (#12602)
This doesn't solve any particular problem, but it does simplify some of
the code that was forced to round-trip through verbose Serialize impls.
2025-07-16 13:27:00 +00:00
Alexander Bayandin
caca08fe78 CI: rework and merge lint-openapi-spec and validate-compute-manifest jobs (#12575)
## Problem

We have several linters that use Node.js, but they are currently set up
differently, both locally and on CI.

## Summary of changes
- Add Node.js to `build-tools` image
- Move `compute/package.json` -> `build-tools/package.json` and add
`redocly` to it `@redocly/cli`
- Unify and merge into one job `lint-openapi-spec` and
`validate-compute-manifest`
2025-07-16 11:08:27 +00:00
Alexander Bayandin
0c99f16c60 CI(run-python-test-set): don't collect code coverage for real (#12611)
## Problem

neondatabase/neon#12601 did't compleatly disable writing `*.profraw`
files, but instead of `/tmp/coverage` it started to write into the
current directory

## Summary of changes
- Set `LLVM_PROFILE_FILE=/dev/null` to avoing writing `*.profraw` at all
2025-07-16 08:26:52 +00:00
Alexey Kondratov
dd7fff655a feat(compute): Introduce privileged_role_name parameter (#12539)
## Problem

Currently `neon_superuser` is hardcoded in many places. It makes it
harder to reuse the same code in different envs.

## Summary of changes

Parametrize `neon_superuser` in `compute_ctl` via
`--privileged-role-name` and in `neon` extensions via
`neon.privileged_role_name`, so it's now possible to use different
'superuser' role names if needed. Everything still defaults to
`neon_superuser`, so no control plane code changes are needed and I
intentionally do not touch regression and migrations tests.

Postgres PRs:
- https://github.com/neondatabase/postgres/pull/674
- https://github.com/neondatabase/postgres/pull/675
- https://github.com/neondatabase/postgres/pull/676
- https://github.com/neondatabase/postgres/pull/677

Cloud PR:
- https://github.com/neondatabase/cloud/pull/31138
2025-07-15 20:22:57 +00:00
quantumish
809633903d Move ShmemHandle into separate module, tweak documentation (#12595)
Initial PR for the hashmap behind the updated LFC implementation. This
refactors `neon-shmem` so that the actual shared memory utilities are in
a separate module within the crate. Beyond that, it slightly changes
some of the docstrings so that they play nicer with `cargo doc`.
2025-07-15 17:40:40 +00:00
Arpad Müller
5c934efb29 Don't depend on the postgres_ffi just for one type (#12610)
We don't want to depend on postgres_ffi in an API crate. If there is no
such dependency, we can compile stuff like `storcon_cli` without needing
a full working postgres build. Fixes regression of #12548 (before we
could compile it).
2025-07-15 17:28:08 +00:00
Heikki Linnakangas
5c9c3b3317 Misc cosmetic cleanups (#12598)
- Remove a few obsolete "allowed error messages" from tests. The
pageserver doesn't emit those messages anymore.

- Remove misplaced and outdated docstring comment from
`test_tenants.py`. A docstring is supposed to be the first thing in a
function, but we had added some code before it. And it was outdated, as
we haven't supported running without safekeepers for a long time.

- Fix misc typos in comments

- Remove obsolete comment about backwards compatibility with safekeepers
without `TIMELINE_STATUS` API. All safekeepers have it by now.
2025-07-15 14:36:28 +00:00
Alexander Bayandin
921a4f2009 CI(run-python-test-set): don't collect code coverage (#12601)
## Problem

We don't use code coverage produced by `regress-tests`
(neondatabase/neon#6798), so there's no need to collect it. Potentially,
disabling it should reduce the load on disks and improve the stability
of debug builds.

## Summary of changes
- Disable code coverage collection for regression tests
2025-07-15 11:16:29 +00:00
dependabot[bot]
eb93c3e3c6 build(deps): bump aiohttp from 3.10.11 to 3.12.14 in the pip group across 1 directory (#12600)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-07-15 11:06:58 +00:00
Alexander Bayandin
7a7ab2a1d1 Move build-tools.Dockerfile -> build-tools/Dockerfile (#12590)
## Problem

This is a prerequisite for neondatabase/neon#12575 to keep all things
relevant to `build-tools` image in a single directory

## Summary of changes
- Rename `build_tools/` to `build-tools/`
- Move `build-tools.Dockerfile` to `build-tools/Dockerfile`
2025-07-15 10:45:49 +00:00
Krzysztof Szafrański
ff526a1051 [proxy] Recognize more cplane errors, use retry_delay_ms as TTL (#12543)
## Problem

Not all cplane errors are properly recognized and cached/retried.

## Summary of changes

Add more cplane error reasons. Also, use retry_delay_ms as cache TTL if
present.

Related to https://github.com/neondatabase/cloud/issues/19353
2025-07-15 07:42:48 +00:00
127 changed files with 5291 additions and 1385 deletions

View File

@@ -27,4 +27,4 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build_tools/patches
!build-tools/patches

View File

@@ -176,7 +176,13 @@ runs:
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
# We don't use code coverage for regression tests (the step is disabled),
# so there's no need to collect it.
# Ref https://github.com/neondatabase/neon/issues/4540
# cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=()
# Explicitly set LLVM_PROFILE_FILE to /dev/null to avoid writing *.profraw files
export LLVM_PROFILE_FILE=/dev/null
else
cov_prefix=()
fi

View File

@@ -150,7 +150,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
@@ -162,7 +162,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
@@ -174,7 +174,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
@@ -186,7 +186,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables

View File

@@ -72,7 +72,7 @@ jobs:
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
IMAGE_TAG: |
${{ hashFiles('build-tools.Dockerfile',
${{ hashFiles('build-tools/Dockerfile',
'.github/workflows/build-build-tools-image.yml') }}
run: |
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
@@ -144,7 +144,7 @@ jobs:
- uses: docker/build-push-action@471d1dc4e07e5cdedd4c2171150001c434f0b7a4 # v6.15.0
with:
file: build-tools.Dockerfile
file: build-tools/Dockerfile
context: .
provenance: false
push: true

View File

@@ -87,22 +87,27 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
lint-openapi-spec:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
lint-yamls:
needs: [ meta, check-permissions, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: make -C compute manifest-schema-validation
- run: make lint-openapi-spec
check-codestyle-python:
@@ -217,28 +222,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.

3
.gitignore vendored
View File

@@ -29,3 +29,6 @@ docker-compose/docker-compose-parallel.yml
# pgindent typedef lists
*.list
# Node
**/node_modules/

14
Cargo.lock generated
View File

@@ -1323,6 +1323,7 @@ dependencies = [
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.22.1",
"bytes",
"camino",
"cfg-if",
@@ -1460,6 +1461,7 @@ dependencies = [
"compute_api",
"endpoint_storage",
"futures",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
@@ -2892,11 +2894,13 @@ dependencies = [
"fail",
"futures",
"hyper 0.14.30",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"once_cell",
"pprof",
"regex",
"routerify",
"rustls 0.23.27",
"rustls-pemfile 2.1.1",
@@ -4378,6 +4382,7 @@ dependencies = [
"pageserver_compaction",
"pageserver_page_api",
"pem",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"postgres_backend",
@@ -4451,6 +4456,7 @@ dependencies = [
"humantime-serde",
"itertools 0.10.5",
"nix 0.30.1",
"once_cell",
"postgres_backend",
"postgres_ffi_types",
"postgres_versioninfo",
@@ -5003,6 +5009,7 @@ dependencies = [
name = "postgres_versioninfo"
version = "0.1.0"
dependencies = [
"anyhow",
"serde",
"serde_repr",
"thiserror 1.0.69",
@@ -5023,6 +5030,7 @@ dependencies = [
"tokio",
"tokio-util",
"tracing",
"tracing-utils",
"workspace_hack",
]
@@ -6196,6 +6204,7 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"nix 0.30.1",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
@@ -6203,6 +6212,7 @@ dependencies = [
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof",
"pq_proto",
@@ -6247,7 +6257,7 @@ dependencies = [
"anyhow",
"const_format",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pq_proto",
"serde",
@@ -8183,6 +8193,7 @@ dependencies = [
"serde",
"serde_assert",
"serde_json",
"serde_with",
"signal-hook",
"strum",
"strum_macros",
@@ -8757,7 +8768,6 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"percent-encoding",
"prettyplease",
"proc-macro2",
"prost 0.13.5",

View File

@@ -74,6 +74,7 @@ aws-smithy-async = { version = "1.2.1", default-features = false, features=["rt-
aws-smithy-types = "1.2"
aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.22"
@@ -118,6 +119,7 @@ humantime = "2.2"
humantime-serde = "1.1.1"
hyper0 = { package = "hyper", version = "0.14" }
hyper = "1.4"
hyper-util = "0.1"
tokio-tungstenite = "0.21.0"
indexmap = { version = "2", features = ["serde"] }
indoc = "2"
@@ -166,6 +168,7 @@ rpds = "0.13"
rustc-hash = "1.1.0"
rustls = { version = "0.23.16", default-features = false }
rustls-pemfile = "2"
rustls-pki-types = "1.11"
scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
@@ -211,10 +214,12 @@ tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
@@ -254,6 +259,7 @@ endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
@@ -275,6 +281,7 @@ storage_controller_client = { path = "./storage_controller/client" }
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
wal_decoder = { version = "0.1", path = "./libs/wal_decoder" }
walproposer = { version = "0.1", path = "./libs/walproposer/" }

View File

@@ -220,11 +220,15 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
build-tools/node_modules: build-tools/package.json
cd build-tools && $(if $(CI),npm ci,npm install)
touch build-tools/node_modules
.PHONY: lint-openapi-spec
lint-openapi-spec:
lint-openapi-spec: build-tools/node_modules
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\
npx --prefix=build-tools/ redocly\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+

View File

@@ -35,7 +35,7 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused=on\ntimeout=15\ntries=5\nretry-on-host-error=on\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
@@ -188,6 +188,12 @@ RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt install -y nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
@@ -311,14 +317,14 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} --locked && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} --locked && \
cargo install cargo-deny --version ${CARGO_DENY_VERSION} --locked && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} --locked && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} --locked && \
cargo install cargo-chef --version ${CARGO_CHEF_VERSION} --locked && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} --locked \
--features postgres-bundled --no-default-features && \
cargo install rustfilt --locked --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --locked --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --locked --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --locked --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --locked --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

3189
build-tools/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

8
build-tools/package.json Normal file
View File

@@ -0,0 +1,8 @@
{
"name": "build-tools",
"private": true,
"devDependencies": {
"@redocly/cli": "1.34.4",
"@sourcemeta/jsonschema": "10.0.0"
}
}

View File

@@ -50,9 +50,9 @@ jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
manifest-schema-validation: ../build-tools/node_modules
npx --prefix=../build-tools/ jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json
npm install
touch node_modules
../build-tools/node_modules: ../build-tools/package.json
cd ../build-tools && $(if $(CI),npm ci,npm install)
touch ../build-tools/node_modules

View File

@@ -9,7 +9,7 @@
#
# build-tools: This contains Rust compiler toolchain and other tools needed at compile
# time. This is also used for the storage builds. This image is defined in
# build-tools.Dockerfile.
# build-tools/Dockerfile.
#
# build-deps: Contains C compiler, other build tools, and compile-time dependencies
# needed to compile PostgreSQL and most extensions. (Some extensions need
@@ -115,7 +115,7 @@ ARG EXTENSIONS=all
FROM $BASE_IMAGE_SHA AS build-deps
ARG DEBIAN_VERSION
# Keep in sync with build-tools.Dockerfile
# Keep in sync with build-tools/Dockerfile
ENV PROTOC_VERSION=25.1
# Use strict mode for bash to catch errors early
@@ -170,7 +170,29 @@ RUN case $DEBIAN_VERSION in \
FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
COPY compute/patches/postgres_fdw.patch .
COPY compute/patches/pg_stat_statements_pg14-16.patch .
COPY compute/patches/pg_stat_statements_pg17.patch .
RUN cd postgres && \
# Apply patches to some contrib extensions
# For example, we need to grant EXECUTE on pg_stat_statements_reset() to {privileged_role_name}.
# In vanilla Postgres this function is limited to Postgres role superuser.
# In Neon we have {privileged_role_name} role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the Postgres repository but it would be hard to maintain,
# whenever we need to pick up a new Postgres version and we want to limit the changes in our Postgres fork,
# so we do it here.
case "${PG_VERSION}" in \
"v14" | "v15" | "v16") \
patch -p1 < /pg_stat_statements_pg14-16.patch; \
;; \
"v17") \
patch -p1 < /pg_stat_statements_pg17.patch; \
;; \
*) \
# To do not forget to migrate patches to the next major version
echo "No contrib patches for this PostgreSQL version" && exit 1;; \
esac && \
patch -p1 < /postgres_fdw.patch && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
@@ -184,8 +206,6 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgres_fdw.control && \
file=/usr/local/pgsql/share/extension/postgres_fdw--1.0.sql && [ -e $file ] && \
echo 'GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO neon_superuser;' >> $file && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
@@ -195,34 +215,7 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control && \
# We need to grant EXECUTE on pg_stat_statements_reset() to neon_superuser.
# In vanilla postgres this function is limited to Postgres role superuser.
# In neon we have neon_superuser role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
# Note that there are no downgrade scripts for pg_stat_statements, so we \
# don't have to modify any downgrade paths or (much) older versions: we only \
# have to make sure every creation of the pg_stat_statements_reset function \
# also adds execute permissions to the neon_superuser.
case $filename in \
pg_stat_statements--1.4.sql) \
# pg_stat_statements_reset is first created with 1.4
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.6--1.7.sql) \
# Then with the 1.6-1.7 migration it is re-created with a new signature, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.10--1.11.sql) \
# Then with the 1.10-1.11 migration it is re-created with a new signature again, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO neon_superuser;' >> $file; \
;; \
esac; \
done;
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
# Set PATH for all the subsequent build steps
ENV PATH="/usr/local/pgsql/bin:$PATH"
@@ -1524,7 +1517,7 @@ WORKDIR /ext-src
COPY compute/patches/pg_duckdb_v031.patch .
COPY compute/patches/duckdb_v120.patch .
# pg_duckdb build requires source dir to be a git repo to get submodules
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
# allow {privileged_role_name} to execute some functions that in pg_duckdb are available to superuser only:
# - extension management function duckdb.install_extension()
# - access to duckdb.extensions table and its sequence
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
@@ -1790,7 +1783,7 @@ RUN set -e \
#########################################################################################
FROM build-deps AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools.Dockerfile and
# Keep sql_exporter version same as in build-tools/Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN if [ "$TARGETARCH" = "amd64" ]; then\

View File

@@ -1,7 +0,0 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -1,22 +1,26 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..b450327 100644
index 0cdc769..5eab1d6 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,15 @@ $$
@@ -1141,3 +1141,19 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
+
+-- NEON Patches
+
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
+ END IF;
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON SCHEMA anon to %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON ALL TABLES IN SCHEMA anon TO %I', privileged_role_name);
+
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ EXECUTE format('GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO %I', privileged_role_name);
+ END IF;
+END $$;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644

View File

@@ -21,13 +21,21 @@ index 3235cc8..6b892bc 100644
include Makefile.global
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..af60106 100644
index d777d76..3b54396 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
@@ -1056,3 +1056,14 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON TABLE duckdb.extensions TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO %I', privileged_role_name);
+END $$;

View File

@@ -0,0 +1,34 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -0,0 +1,52 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
index 0bb2c397711..32764db1d8b 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
@@ -80,3 +80,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO %I', privileged_role_name);
+END $$;
\ No newline at end of file
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -0,0 +1,17 @@
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1bf45..ee077f2eea6 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,12 @@ LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER postgres_fdw
HANDLER postgres_fdw_handler
VALIDATOR postgres_fdw_validator;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO %I', privileged_role_name);
+END $$;

View File

@@ -11,6 +11,7 @@ testing = ["fail/failpoints"]
[dependencies]
async-compression.workspace = true
base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-sdk-kms.workspace = true

View File

@@ -87,6 +87,14 @@ struct Cli {
#[arg(short = 'C', long, value_name = "DATABASE_URL")]
pub connstr: String,
#[arg(
long,
default_value = "neon_superuser",
value_name = "PRIVILEGED_ROLE_NAME",
value_parser = Self::parse_privileged_role_name
)]
pub privileged_role_name: String,
#[cfg(target_os = "linux")]
#[arg(long, default_value = "neon-postgres")]
pub cgroup: String,
@@ -149,6 +157,21 @@ impl Cli {
Ok(url)
}
/// For simplicity, we do not escape `privileged_role_name` anywhere in the code.
/// Since it's a system role, which we fully control, that's fine. Still, let's
/// validate it to avoid any surprises.
fn parse_privileged_role_name(value: &str) -> Result<String> {
use regex::Regex;
let pattern = Regex::new(r"^[a-z_]+$").unwrap();
if !pattern.is_match(value) {
bail!("--privileged-role-name can only contain lowercase letters and underscores")
}
Ok(value.to_string())
}
}
fn main() -> Result<()> {
@@ -178,6 +201,7 @@ fn main() -> Result<()> {
ComputeNodeParams {
compute_id: cli.compute_id,
connstr,
privileged_role_name: cli.privileged_role_name.clone(),
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
@@ -327,4 +351,49 @@ mod test {
])
.expect_err("URL parameters are not allowed");
}
#[test]
fn verify_privileged_role_name() {
// Valid name
let cli = Cli::parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"my_superuser",
]);
assert_eq!(cli.privileged_role_name, "my_superuser");
// Invalid names
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"NeonSuperuser",
])
.expect_err("uppercase letters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"$'neon_superuser",
])
.expect_err("special characters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"",
])
.expect_err("empty name is not allowed");
}
}

View File

@@ -74,12 +74,20 @@ const DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL: u64 = 3600;
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
#[derive(Clone, Debug)]
pub struct ComputeNodeParams {
/// The ID of the compute
pub compute_id: String,
// Url type maintains proper escaping
/// Url type maintains proper escaping
pub connstr: url::Url,
/// The name of the 'weak' superuser role, which we give to the users.
/// It follows the allow list approach, i.e., we take a standard role
/// and grant it extra permissions with explicit GRANTs here and there,
/// and core patches.
pub privileged_role_name: String,
pub resize_swap_on_bind: bool,
pub set_disk_quota_for_fs: Option<String>,
@@ -1286,9 +1294,7 @@ impl ComputeNode {
// In case of error, log and fail the check, but don't crash.
// We're playing it safe because these errors could be transient
// and we don't yet retry. Also being careful here allows us to
// be backwards compatible with safekeepers that don't have the
// TIMELINE_STATUS API yet.
// and we don't yet retry.
if responses.len() < quorum {
error!(
"failed sync safekeepers check {:?} {:?} {:?}",
@@ -1391,6 +1397,7 @@ impl ComputeNode {
self.create_pgdata()?;
config::write_postgres_conf(
pgdata_path,
&self.params,
&pspec.spec,
self.params.internal_http_port,
tls_config,
@@ -1739,6 +1746,7 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
conf.application_name("compute_ctl:migrations");
@@ -1750,7 +1758,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(&mut client).await {
if let Err(e) = handle_migrations(params, &mut client).await {
error!("Failed to run migrations: {}", e);
}
}
@@ -1829,6 +1837,7 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(
pgdata_path,
&self.params,
&spec,
self.params.internal_http_port,
tls_config,

View File

@@ -9,6 +9,7 @@ use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::compute::ComputeNodeParams;
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
@@ -41,6 +42,7 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(
pgdata_path: &Path,
params: &ComputeNodeParams,
spec: &ComputeSpec,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
@@ -161,6 +163,12 @@ pub fn write_postgres_conf(
}
}
writeln!(
file,
"neon.privileged_role_name={}",
escape_conf_value(params.privileged_role_name.as_str())
)?;
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;

View File

@@ -0,0 +1 @@
ALTER ROLE {privileged_role_name} BYPASSRLS;

View File

@@ -1 +0,0 @@
ALTER ROLE neon_superuser BYPASSRLS;

View File

@@ -15,7 +15,7 @@ DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
@@ -23,7 +23,7 @@ BEGIN
FOR role_name IN SELECT rolname FROM pg_roles
WHERE
NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT pg_create_subscription TO neon_superuser';
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
END IF;
END $$;

View File

@@ -1 +0,0 @@
GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION;

View File

@@ -0,0 +1 @@
GRANT pg_monitor TO {privileged_role_name} WITH ADMIN OPTION;

View File

@@ -1,4 +1,4 @@
-- SKIP: Deemed insufficient for allowing relations created by extensions to be
-- interacted with by neon_superuser without permission issues.
-- interacted with by {privileged_role_name} without permission issues.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {privileged_role_name};

View File

@@ -1,4 +1,4 @@
-- SKIP: Deemed insufficient for allowing relations created by extensions to be
-- interacted with by neon_superuser without permission issues.
-- interacted with by {privileged_role_name} without permission issues.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {privileged_role_name};

View File

@@ -1,3 +1,3 @@
-- SKIP: Moved inline to the handle_grants() functions.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {privileged_role_name} WITH GRANT OPTION;

View File

@@ -1,3 +1,3 @@
-- SKIP: Moved inline to the handle_grants() functions.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {privileged_role_name} WITH GRANT OPTION;

View File

@@ -1,7 +1,7 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO neon_superuser';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO neon_superuser';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
END IF;
END $$;

View File

@@ -1 +0,0 @@
GRANT EXECUTE ON FUNCTION pg_show_replication_origin_status TO neon_superuser;

View File

@@ -0,0 +1 @@
GRANT EXECUTE ON FUNCTION pg_show_replication_origin_status TO {privileged_role_name};

View File

@@ -1 +0,0 @@
GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION;

View File

@@ -0,0 +1 @@
GRANT pg_signal_backend TO {privileged_role_name} WITH ADMIN OPTION;

View File

@@ -9,6 +9,7 @@ use reqwest::StatusCode;
use tokio_postgres::Client;
use tracing::{error, info, instrument};
use crate::compute::ComputeNodeParams;
use crate::config;
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
use crate::migration::MigrationRunner;
@@ -169,7 +170,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -178,26 +179,59 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
// Add new migrations in numerical order.
let migrations = [
include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
include_str!("./migrations/0002-alter_roles.sql"),
include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
include_str!(
"./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
&format!(
include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
include_str!(
"./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
&format!(
include_str!("./migrations/0002-alter_roles.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!(
"./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!(
"./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
&format!(
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
&format!(
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"),
];
MigrationRunner::new(client, &migrations)

View File

@@ -13,14 +13,14 @@ use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use crate::compute::{ComputeNode, ComputeState};
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateNeonSuperuser,
CreatePgauditExtension, CreatePgauditlogtofileExtension, CreateSchemaNeon,
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
@@ -49,6 +49,7 @@ impl ComputeNode {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let params = Arc::new(self.params.clone());
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
@@ -157,6 +158,7 @@ impl ComputeNode {
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
params.clone(),
spec.clone(),
conf,
ctx.clone(),
@@ -185,7 +187,7 @@ impl ComputeNode {
}
for phase in [
CreateNeonSuperuser,
CreatePrivilegedRole,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -195,6 +197,7 @@ impl ComputeNode {
] {
info!("Applying phase {:?}", &phase);
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -243,6 +246,7 @@ impl ComputeNode {
}
let fut = Self::apply_spec_sql_db(
params.clone(),
spec.clone(),
conf,
ctx.clone(),
@@ -293,6 +297,7 @@ impl ComputeNode {
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -313,7 +318,9 @@ impl ComputeNode {
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
#[allow(clippy::too_many_arguments)] // TODO: needs bigger refactoring
async fn apply_spec_sql_db(
params: Arc<ComputeNodeParams>,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
@@ -328,6 +335,7 @@ impl ComputeNode {
for subphase in subphases {
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -467,7 +475,7 @@ pub enum PerDatabasePhase {
#[derive(Clone, Debug)]
pub enum ApplySpecPhase {
CreateNeonSuperuser,
CreatePrivilegedRole,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -510,6 +518,7 @@ pub struct MutableApplyContext {
/// - No timeouts have (yet) been implemented.
/// - The caller is responsible for limiting and/or applying concurrency.
pub async fn apply_operations<'a, Fut, F>(
params: Arc<ComputeNodeParams>,
spec: Arc<ComputeSpec>,
ctx: Arc<RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
@@ -527,7 +536,7 @@ where
debug!("Processing phase {:?}", &apply_spec_phase);
let ctx = ctx;
let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
let mut ops = get_operations(&params, &spec, &ctx, &jwks_roles, &apply_spec_phase)
.await?
.peekable();
@@ -588,14 +597,18 @@ where
/// sort/merge/batch execution, but for now this is a nice way to improve
/// batching behavior of the commands.
async fn get_operations<'a>(
params: &'a ComputeNodeParams,
spec: &'a ComputeSpec,
ctx: &'a RwLock<MutableApplyContext>,
jwks_roles: &'a HashSet<String>,
apply_spec_phase: &'a ApplySpecPhase,
) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
match apply_spec_phase {
ApplySpecPhase::CreateNeonSuperuser => Ok(Box::new(once(Operation {
query: include_str!("sql/create_neon_superuser.sql").to_string(),
ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
query: format!(
include_str!("sql/create_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
comment: None,
}))),
ApplySpecPhase::DropInvalidDatabases => {
@@ -697,8 +710,9 @@ async fn get_operations<'a>(
None => {
let query = if !jwks_roles.contains(role.name.as_str()) {
format!(
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE {} {}",
role.name.pg_quote(),
params.privileged_role_name,
role.to_pg_options(),
)
} else {
@@ -849,8 +863,9 @@ async fn get_operations<'a>(
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
query: format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
db.name.pg_quote()
"GRANT ALL PRIVILEGES ON DATABASE {} TO {}",
db.name.pg_quote(),
params.privileged_role_name
),
comment: None,
},

View File

@@ -1,8 +0,0 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -0,0 +1,8 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
THEN
CREATE ROLE {privileged_role_name} CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -39,6 +39,7 @@ safekeeper_api.workspace = true
safekeeper_client.workspace = true
postgres_connection.workspace = true
storage_broker.workspace = true
http-utils.workspace = true
utils.workspace = true
whoami.workspace = true
endpoint_storage.workspace = true

View File

@@ -631,6 +631,10 @@ struct EndpointCreateCmdArgs {
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
allow_multiple: bool,
/// Only allow changing it on creation
#[clap(long, help = "Name of the privileged role for the endpoint")]
privileged_role_name: Option<String>,
}
#[derive(clap::Args)]
@@ -1480,6 +1484,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.grpc,
!args.update_catalog,
false,
args.privileged_role_name.clone(),
)?;
}
EndpointCmd::Start(args) => {

View File

@@ -99,6 +99,7 @@ pub struct EndpointConf {
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
compute_ctl_config: ComputeCtlConfig,
privileged_role_name: Option<String>,
}
//
@@ -199,6 +200,7 @@ impl ComputeControlPlane {
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
privileged_role_name: Option<String>,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
@@ -236,6 +238,7 @@ impl ComputeControlPlane {
features: vec![],
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
privileged_role_name: privileged_role_name.clone(),
});
ep.create_endpoint_dir()?;
@@ -257,6 +260,7 @@ impl ComputeControlPlane {
features: vec![],
cluster: None,
compute_ctl_config,
privileged_role_name,
})?,
)?;
std::fs::write(
@@ -332,6 +336,9 @@ pub struct Endpoint {
/// The compute_ctl config for the endpoint's compute.
compute_ctl_config: ComputeCtlConfig,
/// The name of the privileged role for the endpoint.
privileged_role_name: Option<String>,
}
#[derive(PartialEq, Eq)]
@@ -432,6 +439,7 @@ impl Endpoint {
features: conf.features,
cluster: conf.cluster,
compute_ctl_config: conf.compute_ctl_config,
privileged_role_name: conf.privileged_role_name,
})
}
@@ -464,7 +472,7 @@ impl Endpoint {
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
// It also defines how often the walreceiver will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.pg_address.ip().to_string());
conf.append("port", &self.pg_address.port().to_string());
@@ -870,6 +878,10 @@ impl Endpoint {
cmd.arg("--dev");
}
if let Some(privileged_role_name) = self.privileged_role_name.clone() {
cmd.args(["--privileged-role-name", &privileged_role_name]);
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {

View File

@@ -75,7 +75,7 @@ CLI examples:
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
For local S3 installations, refer to their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:

View File

@@ -12,10 +12,12 @@ camino.workspace = true
fail.workspace = true
futures.workspace = true
hyper0.workspace = true
itertools.workspace = true
jemalloc_pprof.workspace = true
jsonwebtoken.workspace = true
once_cell.workspace = true
pprof.workspace = true
regex.workspace = true
routerify.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true

View File

@@ -1,418 +1 @@
//! Shared memory utilities for neon communicator
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the ShmemHandle functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Error {
Error {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
///
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(
fd: OwnedFd,
initial_size: usize,
max_size: usize,
) -> Result<ShmemHandle, Error> {
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {max_size} too large");
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
}
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed: {e}", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
})
};
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(ShmemHandle {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an Error.
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
if new_size > self.max_size {
panic!(
"new size ({} is greater than max size ({})",
new_size, self.max_size
);
}
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in 'current_size'
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry .
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
}),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
/// responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed: {e}", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area: {e}",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
Error::new(
"could not grow shmem segment, posix_fallocate failed: {e}",
e,
)
})
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {i}");
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}
pub mod shmem;

View File

@@ -0,0 +1,409 @@
//! Dynamically resizable contiguous chunk of shared memory
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// `ShmemHandle` represents a shared memory area that can be shared by processes over `fork()`.
/// Unlike shared memory allocated by Postgres, this area is resizable, up to `max_size` that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with `memfd_create()`. The full address space for
/// `max_size` is reserved up-front with `mmap()`, but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the [`RESIZE_IN_PROGRESS`] flag.
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the [`ShmemHandle`] functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Self {
Self {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// `fork()`'d after calling this, so that the `ShmemHandle` is inherited by all processes.
///
/// If the `ShmemHandle` is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<Self, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(fd: OwnedFd, initial_size: usize, max_size: usize) -> Result<Self, Error> {
// We reserve the high-order bit for the `RESIZE_IN_PROGRESS` flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
assert!(max_size < 1 << 48, "max size {max_size} too large");
assert!(
initial_size <= max_size,
"initial size {initial_size} larger than max size {max_size}"
);
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
});
}
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(Self {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. `new_size` must not be larger than the `max_size` specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an [`shmem::Error`](Error).
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
assert!(
new_size <= self.max_size,
"new size ({new_size}) is greater than max size ({})",
self.max_size
);
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in `current_size`
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the `posix_fallocate`/`ftruncate` call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry.
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64)
.map_err(|e| Error::new("could not shrink shmem segment, ftruncate failed", e)),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent [`ShmemHandle::set_size()`] call can change the size at any time.
/// It is the caller's responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use `memfd_create()`, to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// Disable unused variables warnings because `name` is unused in the macos path.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, posix_fallocate failed", e))
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {i}");
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like `std::sync::Barrier`,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}

View File

@@ -38,6 +38,7 @@ reqwest.workspace = true
rand.workspace = true
tracing.workspace = true
tracing-utils.workspace = true
once_cell.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -110,7 +110,6 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogRecPtr")
.allowlist_type("XLogSegNo")
.allowlist_type("TimeLineID")
.allowlist_type("TimestampTz")
.allowlist_type("MultiXactId")
.allowlist_type("MultiXactOffset")
.allowlist_type("MultiXactStatus")

View File

@@ -227,8 +227,7 @@ pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};

View File

@@ -4,13 +4,14 @@
//! TODO: Generate separate types for each supported PG version
use bytes::{Buf, Bytes};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
@@ -863,7 +864,8 @@ pub mod v17 {
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
pub use crate::TimeLineID;
pub use postgres_ffi_types::TimestampTz;
#[repr(C)]
#[derive(Debug)]

View File

@@ -9,10 +9,11 @@
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
MY_PGVERSION
};
use postgres_ffi_types::TimestampTz;
use super::wal_generator::LogicalMessageGenerator;
use crate::pg_constants;
use crate::PG_TLI;

View File

@@ -11,3 +11,4 @@ pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;
pub type TimestampTz = i64;

View File

@@ -5,6 +5,7 @@ edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
thiserror.workspace = true
serde.workspace = true
serde_repr.workspace = true

View File

@@ -14,5 +14,6 @@ sha2.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-util.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
workspace_hack.workspace = true

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
tokio.workspace = true

View File

@@ -3,7 +3,7 @@
use std::net::SocketAddr;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use postgres_ffi_types::TimestampTz;
use postgres_versioninfo::PgVersionId;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;

View File

@@ -33,6 +33,7 @@ pem.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true

View File

@@ -2,7 +2,8 @@
use bytes::Bytes;
use postgres_ffi::walrecord::{MultiXactMember, describe_postgres_wal_record};
use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId};
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;

View File

@@ -431,7 +431,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0,
last_recorded_time_us: 0,
last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
};
crate::bindings::WalproposerShmemState {

View File

@@ -55,6 +55,7 @@ pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor t
pageserver_compaction.workspace = true
pageserver_page_api.workspace = true
pem.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true

View File

@@ -1,5 +1,5 @@
//! The validator is responsible for validating DeletionLists for execution,
//! based on whethe the generation in the DeletionList is still the latest
//! based on whether the generation in the DeletionList is still the latest
//! generation for a tenant.
//!
//! The purpose of validation is to ensure split-brain safety in the cluster

View File

@@ -25,9 +25,9 @@ use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;

View File

@@ -834,6 +834,10 @@ impl TenantManager {
mut spawn_mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Option<Arc<TenantShard>>, UpsertLocationError> {
fail::fail_point!("upsert-location", |_| {
return Ok(None);
});
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");

View File

@@ -184,7 +184,7 @@ pub(super) async fn connection_manager_loop_step(
// If we've not received any updates from the broker from a while, are waiting for WAL
// and have no safekeeper connection or connection candidates, then it might be that
// the broker subscription is wedged. Drop the currrent subscription and re-subscribe
// the broker subscription is wedged. Drop the current subscription and re-subscribe
// with the goal of unblocking it.
_ = broker_reset_interval.tick() => {
let awaiting_lsn = wait_lsn_status.borrow().is_some();
@@ -192,7 +192,7 @@ pub(super) async fn connection_manager_loop_step(
let no_connection = connection_manager_state.wal_connection.is_none();
if awaiting_lsn && no_candidates && no_connection {
tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
tracing::info!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
}
},

View File

@@ -1,6 +1,6 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain its openapi spec as the
//! truth.
use std::path::Path;

View File

@@ -32,9 +32,10 @@ use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::*;
use postgres_ffi::{
PgMajorVersion, TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion,
enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants,
PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::TimestampTz;
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
@@ -1069,7 +1070,7 @@ impl WalIngest {
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// incremented! nextXid skips over < FirstNormalTransactionId when the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0

View File

@@ -543,6 +543,15 @@ _PG_init(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.privileged_role_name",
"Name of the 'weak' superuser role, which we give to the users",
NULL,
&privileged_role_name,
"neon_superuser",
PGC_POSTMASTER, 0, NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -16,7 +16,6 @@
extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;

View File

@@ -13,7 +13,7 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* Support event triggers for neon_superuser
* Support event triggers for {privileged_role_name}
*
* IDENTIFICATION
* contrib/neon/neon_dll_handler.c
@@ -49,6 +49,7 @@
#include "neon_ddl_handler.h"
#include "neon_utils.h"
#include "neon.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static fmgr_hook_type next_fmgr_hook = NULL;
@@ -541,11 +542,11 @@ NeonXactCallback(XactEvent event, void *arg)
}
static bool
RoleIsNeonSuperuser(const char *role_name)
IsPrivilegedRole(const char *role_name)
{
Assert(role_name);
return strcmp(role_name, "neon_superuser") == 0;
return strcmp(role_name, privileged_role_name) == 0;
}
static void
@@ -578,8 +579,9 @@ HandleCreateDb(CreatedbStmt *stmt)
{
const char *owner_name = defGetString(downer);
if (RoleIsNeonSuperuser(owner_name))
elog(ERROR, "can't create a database with owner neon_superuser");
if (IsPrivilegedRole(owner_name))
elog(ERROR, "could not create a database with owner %s", privileged_role_name);
entry->owner = get_role_oid(owner_name, false);
}
else
@@ -609,8 +611,9 @@ HandleAlterOwner(AlterOwnerStmt *stmt)
memset(entry->old_name, 0, sizeof(entry->old_name));
new_owner = get_rolespec_name(stmt->newowner);
if (RoleIsNeonSuperuser(new_owner))
elog(ERROR, "can't alter owner to neon_superuser");
if (IsPrivilegedRole(new_owner))
elog(ERROR, "could not alter owner to %s", privileged_role_name);
entry->owner = get_role_oid(new_owner, false);
entry->type = Op_Set;
}
@@ -716,8 +719,8 @@ HandleAlterRole(AlterRoleStmt *stmt)
InitRoleTableIfNeeded();
role_name = get_rolespec_name(stmt->role);
if (RoleIsNeonSuperuser(role_name) && !superuser())
elog(ERROR, "can't ALTER neon_superuser");
if (IsPrivilegedRole(role_name) && !superuser())
elog(ERROR, "could not ALTER %s", privileged_role_name);
dpass = NULL;
foreach(option, stmt->options)
@@ -831,7 +834,7 @@ HandleRename(RenameStmt *stmt)
*
* In vanilla only superuser can create Event Triggers.
*
* We allow it for neon_superuser by temporary switching to superuser. But as
* We allow it for {privileged_role_name} by temporary switching to superuser. But as
* far as event trigger can fire in superuser context we should protect
* superuser from execution of arbitrary user's code.
*
@@ -891,7 +894,7 @@ force_noop(FmgrInfo *finfo)
* Also skip executing Event Triggers when GUC neon.event_triggers has been
* set to false. This might be necessary to be able to connect again after a
* LOGIN Event Trigger has been installed that would prevent connections as
* neon_superuser.
* {privileged_role_name}.
*/
static void
neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
@@ -910,24 +913,24 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
}
/*
* The neon_superuser role can use the GUC neon.event_triggers to disable
* The {privileged_role_name} role can use the GUC neon.event_triggers to disable
* firing Event Trigger.
*
* SET neon.event_triggers TO false;
*
* This only applies to the neon_superuser role though, and only allows
* skipping Event Triggers owned by neon_superuser, which we check by
* proxy of the Event Trigger function being owned by neon_superuser.
* This only applies to the {privileged_role_name} role though, and only allows
* skipping Event Triggers owned by {privileged_role_name}, which we check by
* proxy of the Event Trigger function being owned by {privileged_role_name}.
*
* A role that is created in role neon_superuser should be allowed to also
* A role that is created in role {privileged_role_name} should be allowed to also
* benefit from the neon_event_triggers GUC, and will be considered the
* same as the neon_superuser role.
* same as the {privileged_role_name} role.
*/
if (event == FHET_START
&& !neon_event_triggers
&& is_neon_superuser())
&& is_privileged_role())
{
Oid neon_superuser_oid = get_role_oid("neon_superuser", false);
Oid weak_superuser_oid = get_role_oid(privileged_role_name, false);
/* Find the Function Attributes (owner Oid, security definer) */
const char *fun_owner_name = NULL;
@@ -937,8 +940,8 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
LookupFuncOwnerSecDef(flinfo->fn_oid, &fun_owner, &fun_is_secdef);
fun_owner_name = GetUserNameFromId(fun_owner, false);
if (RoleIsNeonSuperuser(fun_owner_name)
|| has_privs_of_role(fun_owner, neon_superuser_oid))
if (IsPrivilegedRole(fun_owner_name)
|| has_privs_of_role(fun_owner, weak_superuser_oid))
{
elog(WARNING,
"Skipping Event Trigger: neon.event_triggers is false");
@@ -1149,13 +1152,13 @@ ProcessCreateEventTrigger(
}
/*
* Allow neon_superuser to create Event Trigger, while keeping the
* Allow {privileged_role_name} to create Event Trigger, while keeping the
* ownership of the object.
*
* For that we give superuser membership to the role for the execution of
* the command.
*/
if (IsTransactionState() && is_neon_superuser())
if (IsTransactionState() && is_privileged_role())
{
/* Find the Event Trigger function Oid */
Oid func_oid = LookupFuncName(stmt->funcname, 0, NULL, false);
@@ -1232,7 +1235,7 @@ ProcessCreateEventTrigger(
*
* That way [ ALTER | DROP ] EVENT TRIGGER commands just work.
*/
if (IsTransactionState() && is_neon_superuser())
if (IsTransactionState() && is_privileged_role())
{
if (!current_user_is_super)
{
@@ -1352,19 +1355,17 @@ NeonProcessUtility(
}
/*
* Only neon_superuser is granted privilege to edit neon.event_triggers GUC.
* Only {privileged_role_name} is granted privilege to edit neon.event_triggers GUC.
*/
static void
neon_event_triggers_assign_hook(bool newval, void *extra)
{
/* MyDatabaseId == InvalidOid || !OidIsValid(GetUserId()) */
if (IsTransactionState() && !is_neon_superuser())
if (IsTransactionState() && !is_privileged_role())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to set neon.event_triggers"),
errdetail("Only \"neon_superuser\" is allowed to set the GUC")));
errdetail("Only \"%s\" is allowed to set the GUC", privileged_role_name)));
}
}

View File

@@ -377,6 +377,16 @@ typedef struct PageserverFeedback
} PageserverFeedback;
/* BEGIN_HADRON */
/**
* WAL proposer is the only backend that will update `sent_bytes` and `last_recorded_time_us`.
* Once the `sent_bytes` reaches the limit, it puts backpressure on PG backends.
*
* A PG backend checks `should_limit` to see if it should hit backpressure.
* - If yes, it also checks the `last_recorded_time_us` to see
* if it's time to push more WALs. This is because the WAL proposer
* only resets `should_limit` to 0 after it is notified about new WALs
* which might take a while.
*/
typedef struct WalRateLimiter
{
/* If the value is 1, PG backends will hit backpressure. */
@@ -384,7 +394,7 @@ typedef struct WalRateLimiter
/* The number of bytes sent in the current second. */
uint64 sent_bytes;
/* The last recorded time in microsecond. */
TimestampTz last_recorded_time_us;
pg_atomic_uint64 last_recorded_time_us;
} WalRateLimiter;
/* END_HADRON */

View File

@@ -449,8 +449,20 @@ backpressure_lag_impl(void)
}
state = GetWalpropShmemState();
if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1)
if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit))
{
TimestampTz now = GetCurrentTimestamp();
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
if (now - last_recorded_time > USECS_PER_SEC)
{
/*
* The backend has past 1 second since the last recorded time and it's time to push more WALs.
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
*/
uint32 expected = true;
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
}
return 1;
}
/* END_HADRON */
@@ -502,6 +514,7 @@ WalproposerShmemInit(void)
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
/* END_HADRON */
}
LWLockRelease(AddinShmemInitLock);
@@ -520,6 +533,7 @@ WalproposerShmemInit_SyncSafekeeper(void)
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
/* END_HADRON */
}
@@ -1551,18 +1565,18 @@ XLogBroadcastWalProposer(WalProposer *wp)
{
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
if (now - limiter->last_recorded_time_us > USECS_PER_SEC)
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
if (now - last_recorded_time > USECS_PER_SEC)
{
/* Reset the rate limiter */
limiter->last_recorded_time_us = now;
limiter->sent_bytes = 0;
pg_atomic_exchange_u32(&limiter->should_limit, 0);
pg_atomic_write_u64(&limiter->last_recorded_time_us, now);
pg_atomic_write_u32(&limiter->should_limit, false);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > max_wal_bytes)
{
pg_atomic_exchange_u32(&limiter->should_limit, 1);
pg_atomic_write_u32(&limiter->should_limit, true);
}
}
/* END_HADRON */

209
poetry.lock generated
View File

@@ -2,127 +2,123 @@
[[package]]
name = "aiohappyeyeballs"
version = "2.3.5"
version = "2.6.1"
description = "Happy Eyeballs for asyncio"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "aiohappyeyeballs-2.3.5-py3-none-any.whl", hash = "sha256:4d6dea59215537dbc746e93e779caea8178c866856a721c9c660d7a5a7b8be03"},
{file = "aiohappyeyeballs-2.3.5.tar.gz", hash = "sha256:6fa48b9f1317254f122a07a131a86b71ca6946ca989ce6326fff54a99a920105"},
{file = "aiohappyeyeballs-2.6.1-py3-none-any.whl", hash = "sha256:f349ba8f4b75cb25c99c5c2d84e997e485204d2902a9597802b0371f09331fb8"},
{file = "aiohappyeyeballs-2.6.1.tar.gz", hash = "sha256:c3f9d0113123803ccadfdf3f0faa505bc78e6a72d1cc4806cbd719826e943558"},
]
[[package]]
name = "aiohttp"
version = "3.10.11"
version = "3.12.14"
description = "Async http client/server framework (asyncio)"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "aiohttp-3.10.11-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5077b1a5f40ffa3ba1f40d537d3bec4383988ee51fbba6b74aa8fb1bc466599e"},
{file = "aiohttp-3.10.11-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8d6a14a4d93b5b3c2891fca94fa9d41b2322a68194422bef0dd5ec1e57d7d298"},
{file = "aiohttp-3.10.11-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ffbfde2443696345e23a3c597049b1dd43049bb65337837574205e7368472177"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:20b3d9e416774d41813bc02fdc0663379c01817b0874b932b81c7f777f67b217"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2b943011b45ee6bf74b22245c6faab736363678e910504dd7531a58c76c9015a"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48bc1d924490f0d0b3658fe5c4b081a4d56ebb58af80a6729d4bd13ea569797a"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e12eb3f4b1f72aaaf6acd27d045753b18101524f72ae071ae1c91c1cd44ef115"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f14ebc419a568c2eff3c1ed35f634435c24ead2fe19c07426af41e7adb68713a"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:72b191cdf35a518bfc7ca87d770d30941decc5aaf897ec8b484eb5cc8c7706f3"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:5ab2328a61fdc86424ee540d0aeb8b73bbcad7351fb7cf7a6546fc0bcffa0038"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:aa93063d4af05c49276cf14e419550a3f45258b6b9d1f16403e777f1addf4519"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:30283f9d0ce420363c24c5c2421e71a738a2155f10adbb1a11a4d4d6d2715cfc"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e5358addc8044ee49143c546d2182c15b4ac3a60be01c3209374ace05af5733d"},
{file = "aiohttp-3.10.11-cp310-cp310-win32.whl", hash = "sha256:e1ffa713d3ea7cdcd4aea9cddccab41edf6882fa9552940344c44e59652e1120"},
{file = "aiohttp-3.10.11-cp310-cp310-win_amd64.whl", hash = "sha256:778cbd01f18ff78b5dd23c77eb82987ee4ba23408cbed233009fd570dda7e674"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:80ff08556c7f59a7972b1e8919f62e9c069c33566a6d28586771711e0eea4f07"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2c8f96e9ee19f04c4914e4e7a42a60861066d3e1abf05c726f38d9d0a466e695"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fb8601394d537da9221947b5d6e62b064c9a43e88a1ecd7414d21a1a6fba9c24"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2ea224cf7bc2d8856d6971cea73b1d50c9c51d36971faf1abc169a0d5f85a382"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:db9503f79e12d5d80b3efd4d01312853565c05367493379df76d2674af881caa"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0f449a50cc33f0384f633894d8d3cd020e3ccef81879c6e6245c3c375c448625"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82052be3e6d9e0c123499127782a01a2b224b8af8c62ab46b3f6197035ad94e9"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:20063c7acf1eec550c8eb098deb5ed9e1bb0521613b03bb93644b810986027ac"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:489cced07a4c11488f47aab1f00d0c572506883f877af100a38f1fedaa884c3a"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:ea9b3bab329aeaa603ed3bf605f1e2a6f36496ad7e0e1aa42025f368ee2dc07b"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:ca117819d8ad113413016cb29774b3f6d99ad23c220069789fc050267b786c16"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:2dfb612dcbe70fb7cdcf3499e8d483079b89749c857a8f6e80263b021745c730"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f9b615d3da0d60e7d53c62e22b4fd1c70f4ae5993a44687b011ea3a2e49051b8"},
{file = "aiohttp-3.10.11-cp311-cp311-win32.whl", hash = "sha256:29103f9099b6068bbdf44d6a3d090e0a0b2be6d3c9f16a070dd9d0d910ec08f9"},
{file = "aiohttp-3.10.11-cp311-cp311-win_amd64.whl", hash = "sha256:236b28ceb79532da85d59aa9b9bf873b364e27a0acb2ceaba475dc61cffb6f3f"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:7480519f70e32bfb101d71fb9a1f330fbd291655a4c1c922232a48c458c52710"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:f65267266c9aeb2287a6622ee2bb39490292552f9fbf851baabc04c9f84e048d"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:7400a93d629a0608dc1d6c55f1e3d6e07f7375745aaa8bd7f085571e4d1cee97"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f34b97e4b11b8d4eb2c3a4f975be626cc8af99ff479da7de49ac2c6d02d35725"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1e7b825da878464a252ccff2958838f9caa82f32a8dbc334eb9b34a026e2c636"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f9f92a344c50b9667827da308473005f34767b6a2a60d9acff56ae94f895f385"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc6f1ab987a27b83c5268a17218463c2ec08dbb754195113867a27b166cd6087"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1dc0f4ca54842173d03322793ebcf2c8cc2d34ae91cc762478e295d8e361e03f"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:7ce6a51469bfaacff146e59e7fb61c9c23006495d11cc24c514a455032bcfa03"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:aad3cd91d484d065ede16f3cf15408254e2469e3f613b241a1db552c5eb7ab7d"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:f4df4b8ca97f658c880fb4b90b1d1ec528315d4030af1ec763247ebfd33d8b9a"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:2e4e18a0a2d03531edbc06c366954e40a3f8d2a88d2b936bbe78a0c75a3aab3e"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6ce66780fa1a20e45bc753cda2a149daa6dbf1561fc1289fa0c308391c7bc0a4"},
{file = "aiohttp-3.10.11-cp312-cp312-win32.whl", hash = "sha256:a919c8957695ea4c0e7a3e8d16494e3477b86f33067478f43106921c2fef15bb"},
{file = "aiohttp-3.10.11-cp312-cp312-win_amd64.whl", hash = "sha256:b5e29706e6389a2283a91611c91bf24f218962717c8f3b4e528ef529d112ee27"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:703938e22434d7d14ec22f9f310559331f455018389222eed132808cd8f44127"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9bc50b63648840854e00084c2b43035a62e033cb9b06d8c22b409d56eb098413"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5f0463bf8b0754bc744e1feb61590706823795041e63edf30118a6f0bf577461"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f6c6dec398ac5a87cb3a407b068e1106b20ef001c344e34154616183fe684288"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bcaf2d79104d53d4dcf934f7ce76d3d155302d07dae24dff6c9fffd217568067"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:25fd5470922091b5a9aeeb7e75be609e16b4fba81cdeaf12981393fb240dd10e"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bbde2ca67230923a42161b1f408c3992ae6e0be782dca0c44cb3206bf330dee1"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:249c8ff8d26a8b41a0f12f9df804e7c685ca35a207e2410adbd3e924217b9006"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:878ca6a931ee8c486a8f7b432b65431d095c522cbeb34892bee5be97b3481d0f"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:8663f7777ce775f0413324be0d96d9730959b2ca73d9b7e2c2c90539139cbdd6"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:6cd3f10b01f0c31481fba8d302b61603a2acb37b9d30e1d14e0f5a58b7b18a31"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:4e8d8aad9402d3aa02fdc5ca2fe68bcb9fdfe1f77b40b10410a94c7f408b664d"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:38e3c4f80196b4f6c3a85d134a534a56f52da9cb8d8e7af1b79a32eefee73a00"},
{file = "aiohttp-3.10.11-cp313-cp313-win32.whl", hash = "sha256:fc31820cfc3b2863c6e95e14fcf815dc7afe52480b4dc03393c4873bb5599f71"},
{file = "aiohttp-3.10.11-cp313-cp313-win_amd64.whl", hash = "sha256:4996ff1345704ffdd6d75fb06ed175938c133425af616142e7187f28dc75f14e"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:74baf1a7d948b3d640badeac333af581a367ab916b37e44cf90a0334157cdfd2"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:473aebc3b871646e1940c05268d451f2543a1d209f47035b594b9d4e91ce8339"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:c2f746a6968c54ab2186574e15c3f14f3e7f67aef12b761e043b33b89c5b5f95"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d110cabad8360ffa0dec8f6ec60e43286e9d251e77db4763a87dcfe55b4adb92"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0099c7d5d7afff4202a0c670e5b723f7718810000b4abcbc96b064129e64bc7"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0316e624b754dbbf8c872b62fe6dcb395ef20c70e59890dfa0de9eafccd2849d"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5a5f7ab8baf13314e6b2485965cbacb94afff1e93466ac4d06a47a81c50f9cca"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c891011e76041e6508cbfc469dd1a8ea09bc24e87e4c204e05f150c4c455a5fa"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:9208299251370ee815473270c52cd3f7069ee9ed348d941d574d1457d2c73e8b"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:459f0f32c8356e8125f45eeff0ecf2b1cb6db1551304972702f34cd9e6c44658"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:14cdc8c1810bbd4b4b9f142eeee23cda528ae4e57ea0923551a9af4820980e39"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_s390x.whl", hash = "sha256:971aa438a29701d4b34e4943e91b5e984c3ae6ccbf80dd9efaffb01bd0b243a9"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:9a309c5de392dfe0f32ee57fa43ed8fc6ddf9985425e84bd51ed66bb16bce3a7"},
{file = "aiohttp-3.10.11-cp38-cp38-win32.whl", hash = "sha256:9ec1628180241d906a0840b38f162a3215114b14541f1a8711c368a8739a9be4"},
{file = "aiohttp-3.10.11-cp38-cp38-win_amd64.whl", hash = "sha256:9c6e0ffd52c929f985c7258f83185d17c76d4275ad22e90aa29f38e211aacbec"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:cdc493a2e5d8dc79b2df5bec9558425bcd39aff59fc949810cbd0832e294b106"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b3e70f24e7d0405be2348da9d5a7836936bf3a9b4fd210f8c37e8d48bc32eca6"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:968b8fb2a5eee2770eda9c7b5581587ef9b96fbdf8dcabc6b446d35ccc69df01"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deef4362af9493d1382ef86732ee2e4cbc0d7c005947bd54ad1a9a16dd59298e"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:686b03196976e327412a1b094f4120778c7c4b9cff9bce8d2fdfeca386b89829"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3bf6d027d9d1d34e1c2e1645f18a6498c98d634f8e373395221121f1c258ace8"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:099fd126bf960f96d34a760e747a629c27fb3634da5d05c7ef4d35ef4ea519fc"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c73c4d3dae0b4644bc21e3de546530531d6cdc88659cdeb6579cd627d3c206aa"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:0c5580f3c51eea91559db3facd45d72e7ec970b04528b4709b1f9c2555bd6d0b"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:fdf6429f0caabfd8a30c4e2eaecb547b3c340e4730ebfe25139779b9815ba138"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:d97187de3c276263db3564bb9d9fad9e15b51ea10a371ffa5947a5ba93ad6777"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:0acafb350cfb2eba70eb5d271f55e08bd4502ec35e964e18ad3e7d34d71f7261"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c13ed0c779911c7998a58e7848954bd4d63df3e3575f591e321b19a2aec8df9f"},
{file = "aiohttp-3.10.11-cp39-cp39-win32.whl", hash = "sha256:22b7c540c55909140f63ab4f54ec2c20d2635c0289cdd8006da46f3327f971b9"},
{file = "aiohttp-3.10.11-cp39-cp39-win_amd64.whl", hash = "sha256:7b26b1551e481012575dab8e3727b16fe7dd27eb2711d2e63ced7368756268fb"},
{file = "aiohttp-3.10.11.tar.gz", hash = "sha256:9dc2b8f3dcab2e39e0fa309c8da50c3b55e6f34ab25f1a71d3288f24924d33a7"},
{file = "aiohttp-3.12.14-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:906d5075b5ba0dd1c66fcaaf60eb09926a9fef3ca92d912d2a0bbdbecf8b1248"},
{file = "aiohttp-3.12.14-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c875bf6fc2fd1a572aba0e02ef4e7a63694778c5646cdbda346ee24e630d30fb"},
{file = "aiohttp-3.12.14-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fbb284d15c6a45fab030740049d03c0ecd60edad9cd23b211d7e11d3be8d56fd"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38e360381e02e1a05d36b223ecab7bc4a6e7b5ab15760022dc92589ee1d4238c"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:aaf90137b5e5d84a53632ad95ebee5c9e3e7468f0aab92ba3f608adcb914fa95"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e532a25e4a0a2685fa295a31acf65e027fbe2bea7a4b02cdfbbba8a064577663"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eab9762c4d1b08ae04a6c77474e6136da722e34fdc0e6d6eab5ee93ac29f35d1"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:abe53c3812b2899889a7fca763cdfaeee725f5be68ea89905e4275476ffd7e61"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5760909b7080aa2ec1d320baee90d03b21745573780a072b66ce633eb77a8656"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:02fcd3f69051467bbaa7f84d7ec3267478c7df18d68b2e28279116e29d18d4f3"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:4dcd1172cd6794884c33e504d3da3c35648b8be9bfa946942d353b939d5f1288"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:224d0da41355b942b43ad08101b1b41ce633a654128ee07e36d75133443adcda"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:e387668724f4d734e865c1776d841ed75b300ee61059aca0b05bce67061dcacc"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:dec9cde5b5a24171e0b0a4ca064b1414950904053fb77c707efd876a2da525d8"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:bbad68a2af4877cc103cd94af9160e45676fc6f0c14abb88e6e092b945c2c8e3"},
{file = "aiohttp-3.12.14-cp310-cp310-win32.whl", hash = "sha256:ee580cb7c00bd857b3039ebca03c4448e84700dc1322f860cf7a500a6f62630c"},
{file = "aiohttp-3.12.14-cp310-cp310-win_amd64.whl", hash = "sha256:cf4f05b8cea571e2ccc3ca744e35ead24992d90a72ca2cf7ab7a2efbac6716db"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:f4552ff7b18bcec18b60a90c6982049cdb9dac1dba48cf00b97934a06ce2e597"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:8283f42181ff6ccbcf25acaae4e8ab2ff7e92b3ca4a4ced73b2c12d8cd971393"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:040afa180ea514495aaff7ad34ec3d27826eaa5d19812730fe9e529b04bb2179"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b413c12f14c1149f0ffd890f4141a7471ba4b41234fe4fd4a0ff82b1dc299dbb"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:1d6f607ce2e1a93315414e3d448b831238f1874b9968e1195b06efaa5c87e245"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:565e70d03e924333004ed101599902bba09ebb14843c8ea39d657f037115201b"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4699979560728b168d5ab63c668a093c9570af2c7a78ea24ca5212c6cdc2b641"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ad5fdf6af93ec6c99bf800eba3af9a43d8bfd66dce920ac905c817ef4a712afe"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4ac76627c0b7ee0e80e871bde0d376a057916cb008a8f3ffc889570a838f5cc7"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:798204af1180885651b77bf03adc903743a86a39c7392c472891649610844635"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:4f1205f97de92c37dd71cf2d5bcfb65fdaed3c255d246172cce729a8d849b4da"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:76ae6f1dd041f85065d9df77c6bc9c9703da9b5c018479d20262acc3df97d419"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:a194ace7bc43ce765338ca2dfb5661489317db216ea7ea700b0332878b392cab"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:16260e8e03744a6fe3fcb05259eeab8e08342c4c33decf96a9dad9f1187275d0"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:8c779e5ebbf0e2e15334ea404fcce54009dc069210164a244d2eac8352a44b28"},
{file = "aiohttp-3.12.14-cp311-cp311-win32.whl", hash = "sha256:a289f50bf1bd5be227376c067927f78079a7bdeccf8daa6a9e65c38bae14324b"},
{file = "aiohttp-3.12.14-cp311-cp311-win_amd64.whl", hash = "sha256:0b8a69acaf06b17e9c54151a6c956339cf46db4ff72b3ac28516d0f7068f4ced"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:a0ecbb32fc3e69bc25efcda7d28d38e987d007096cbbeed04f14a6662d0eee22"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:0400f0ca9bb3e0b02f6466421f253797f6384e9845820c8b05e976398ac1d81a"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a56809fed4c8a830b5cae18454b7464e1529dbf66f71c4772e3cfa9cbec0a1ff"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:27f2e373276e4755691a963e5d11756d093e346119f0627c2d6518208483fb6d"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:ca39e433630e9a16281125ef57ece6817afd1d54c9f1bf32e901f38f16035869"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9c748b3f8b14c77720132b2510a7d9907a03c20ba80f469e58d5dfd90c079a1c"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f0a568abe1b15ce69d4cc37e23020720423f0728e3cb1f9bcd3f53420ec3bfe7"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9888e60c2c54eaf56704b17feb558c7ed6b7439bca1e07d4818ab878f2083660"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3006a1dc579b9156de01e7916d38c63dc1ea0679b14627a37edf6151bc530088"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:aa8ec5c15ab80e5501a26719eb48a55f3c567da45c6ea5bb78c52c036b2655c7"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:39b94e50959aa07844c7fe2206b9f75d63cc3ad1c648aaa755aa257f6f2498a9"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:04c11907492f416dad9885d503fbfc5dcb6768d90cad8639a771922d584609d3"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:88167bd9ab69bb46cee91bd9761db6dfd45b6e76a0438c7e884c3f8160ff21eb"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:791504763f25e8f9f251e4688195e8b455f8820274320204f7eafc467e609425"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2785b112346e435dd3a1a67f67713a3fe692d288542f1347ad255683f066d8e0"},
{file = "aiohttp-3.12.14-cp312-cp312-win32.whl", hash = "sha256:15f5f4792c9c999a31d8decf444e79fcfd98497bf98e94284bf390a7bb8c1729"},
{file = "aiohttp-3.12.14-cp312-cp312-win_amd64.whl", hash = "sha256:3b66e1a182879f579b105a80d5c4bd448b91a57e8933564bf41665064796a338"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:3143a7893d94dc82bc409f7308bc10d60285a3cd831a68faf1aa0836c5c3c767"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:3d62ac3d506cef54b355bd34c2a7c230eb693880001dfcda0bf88b38f5d7af7e"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:48e43e075c6a438937c4de48ec30fa8ad8e6dfef122a038847456bfe7b947b63"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:077b4488411a9724cecc436cbc8c133e0d61e694995b8de51aaf351c7578949d"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:d8c35632575653f297dcbc9546305b2c1133391089ab925a6a3706dfa775ccab"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6b8ce87963f0035c6834b28f061df90cf525ff7c9b6283a8ac23acee6502afd4"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f0a2cf66e32a2563bb0766eb24eae7e9a269ac0dc48db0aae90b575dc9583026"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cdea089caf6d5cde975084a884c72d901e36ef9c2fd972c9f51efbbc64e96fbd"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a7865f27db67d49e81d463da64a59365ebd6b826e0e4847aa111056dcb9dc88"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0ab5b38a6a39781d77713ad930cb5e7feea6f253de656a5f9f281a8f5931b086"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:9b3b15acee5c17e8848d90a4ebc27853f37077ba6aec4d8cb4dbbea56d156933"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:e4c972b0bdaac167c1e53e16a16101b17c6d0ed7eac178e653a07b9f7fad7151"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:7442488b0039257a3bdbc55f7209587911f143fca11df9869578db6c26feeeb8"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:f68d3067eecb64c5e9bab4a26aa11bd676f4c70eea9ef6536b0a4e490639add3"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f88d3704c8b3d598a08ad17d06006cb1ca52a1182291f04979e305c8be6c9758"},
{file = "aiohttp-3.12.14-cp313-cp313-win32.whl", hash = "sha256:a3c99ab19c7bf375c4ae3debd91ca5d394b98b6089a03231d4c580ef3c2ae4c5"},
{file = "aiohttp-3.12.14-cp313-cp313-win_amd64.whl", hash = "sha256:3f8aad695e12edc9d571f878c62bedc91adf30c760c8632f09663e5f564f4baa"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:b8cc6b05e94d837bcd71c6531e2344e1ff0fb87abe4ad78a9261d67ef5d83eae"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d1dcb015ac6a3b8facd3677597edd5ff39d11d937456702f0bb2b762e390a21b"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3779ed96105cd70ee5e85ca4f457adbce3d9ff33ec3d0ebcdf6c5727f26b21b3"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:717a0680729b4ebd7569c1dcd718c46b09b360745fd8eb12317abc74b14d14d0"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:b5dd3a2ef7c7e968dbbac8f5574ebeac4d2b813b247e8cec28174a2ba3627170"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4710f77598c0092239bc12c1fcc278a444e16c7032d91babf5abbf7166463f7b"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f3e9f75ae842a6c22a195d4a127263dbf87cbab729829e0bd7857fb1672400b2"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5f9c8d55d6802086edd188e3a7d85a77787e50d56ce3eb4757a3205fa4657922"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:79b29053ff3ad307880d94562cca80693c62062a098a5776ea8ef5ef4b28d140"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:23e1332fff36bebd3183db0c7a547a1da9d3b4091509f6d818e098855f2f27d3"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:a564188ce831fd110ea76bcc97085dd6c625b427db3f1dbb14ca4baa1447dcbc"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:a7a1b4302f70bb3ec40ca86de82def532c97a80db49cac6a6700af0de41af5ee"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:1b07ccef62950a2519f9bfc1e5b294de5dd84329f444ca0b329605ea787a3de5"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:938bd3ca6259e7e48b38d84f753d548bd863e0c222ed6ee6ace3fd6752768a84"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:8bc784302b6b9f163b54c4e93d7a6f09563bd01ff2b841b29ed3ac126e5040bf"},
{file = "aiohttp-3.12.14-cp39-cp39-win32.whl", hash = "sha256:a3416f95961dd7d5393ecff99e3f41dc990fb72eda86c11f2a60308ac6dcd7a0"},
{file = "aiohttp-3.12.14-cp39-cp39-win_amd64.whl", hash = "sha256:196858b8820d7f60578f8b47e5669b3195c21d8ab261e39b1d705346458f445f"},
{file = "aiohttp-3.12.14.tar.gz", hash = "sha256:6e06e120e34d93100de448fd941522e11dafa78ef1a893c179901b7d66aa29f2"},
]
[package.dependencies]
aiohappyeyeballs = ">=2.3.0"
aiosignal = ">=1.1.2"
aiohappyeyeballs = ">=2.5.0"
aiosignal = ">=1.4.0"
attrs = ">=17.3.0"
frozenlist = ">=1.1.1"
multidict = ">=4.5,<7.0"
yarl = ">=1.12.0,<2.0"
propcache = ">=0.2.0"
yarl = ">=1.17.0,<2.0"
[package.extras]
speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""]
speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.3.0)", "brotlicffi ; platform_python_implementation != \"CPython\""]
[[package]]
name = "aiopg"
@@ -145,18 +141,19 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "aiosignal"
version = "1.3.1"
version = "1.4.0"
description = "aiosignal: a list of registered asynchronous callbacks"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "aiosignal-1.3.1-py3-none-any.whl", hash = "sha256:f8376fb07dd1e86a584e4fcdec80b36b7f81aac666ebc724e2c090300dd83b17"},
{file = "aiosignal-1.3.1.tar.gz", hash = "sha256:54cd96e15e1649b75d6c87526a6ff0b6c1b0dd3459f43d9ca11d48c339b68cfc"},
{file = "aiosignal-1.4.0-py3-none-any.whl", hash = "sha256:053243f8b92b990551949e63930a839ff0cf0b0ebbe0597b0f3fb19e1a0fe82e"},
{file = "aiosignal-1.4.0.tar.gz", hash = "sha256:f47eecd9468083c2029cc99945502cb7708b082c232f9aca65da147157b251c7"},
]
[package.dependencies]
frozenlist = ">=1.1.0"
typing-extensions = {version = ">=4.2", markers = "python_version < \"3.13\""}
[[package]]
name = "allure-pytest"
@@ -3847,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
content-hash = "6a1e8ba06b8194bf28d87fd5e184e2ddc2b4a19dffcbe3953b26da3d55c9212f"

View File

@@ -1,13 +1,11 @@
use std::collections::{HashMap, HashSet, hash_map};
use std::convert::Infallible;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use async_trait::async_trait;
use clashmap::ClashMap;
use clashmap::mapref::one::Ref;
use rand::{Rng, thread_rng};
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info};
@@ -22,31 +20,23 @@ pub(crate) trait ProjectInfoCache {
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt);
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt);
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
async fn decrement_active_listeners(&self);
async fn increment_active_listeners(&self);
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: T,
}
impl<T> Entry<T> {
pub(crate) fn new(value: T) -> Self {
pub(crate) fn new(value: T, ttl: Duration) -> Self {
Self {
created_at: Instant::now(),
expires_at: Instant::now() + ttl,
value,
}
}
pub(crate) fn get(&self, valid_since: Instant) -> Option<&T> {
(valid_since < self.created_at).then_some(&self.value)
}
}
impl<T> From<T> for Entry<T> {
fn from(value: T) -> Self {
Self::new(value)
pub(crate) fn get(&self) -> Option<&T> {
(self.expires_at > Instant::now()).then_some(&self.value)
}
}
@@ -56,18 +46,12 @@ struct EndpointInfo {
}
impl EndpointInfo {
pub(crate) fn get_role_secret(
&self,
role_name: RoleNameInt,
valid_since: Instant,
) -> Option<RoleAccessControl> {
let controls = self.role_controls.get(&role_name)?;
controls.get(valid_since).cloned()
pub(crate) fn get_role_secret(&self, role_name: RoleNameInt) -> Option<RoleAccessControl> {
self.role_controls.get(&role_name)?.get().cloned()
}
pub(crate) fn get_controls(&self, valid_since: Instant) -> Option<EndpointAccessControl> {
let controls = self.controls.as_ref()?;
controls.get(valid_since).cloned()
pub(crate) fn get_controls(&self) -> Option<EndpointAccessControl> {
self.controls.as_ref()?.get().cloned()
}
pub(crate) fn invalidate_endpoint(&mut self) {
@@ -92,11 +76,8 @@ pub struct ProjectInfoCacheImpl {
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
// FIXME(stefan): we need a way to GC the account2ep map.
account2ep: ClashMap<AccountIdInt, HashSet<EndpointIdInt>>,
config: ProjectInfoCacheOptions,
start_time: Instant,
ttl_disabled_since_us: AtomicU64,
active_listeners_lock: Mutex<usize>,
config: ProjectInfoCacheOptions,
}
#[async_trait]
@@ -152,29 +133,6 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
}
}
}
async fn decrement_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
if *listeners_guard == 0 {
tracing::error!("active_listeners count is already 0, something is broken");
return;
}
*listeners_guard -= 1;
if *listeners_guard == 0 {
self.ttl_disabled_since_us
.store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
}
}
async fn increment_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
*listeners_guard += 1;
if *listeners_guard == 1 {
let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
self.ttl_disabled_since_us
.store(new_ttl, std::sync::atomic::Ordering::SeqCst);
}
}
}
impl ProjectInfoCacheImpl {
@@ -184,9 +142,6 @@ impl ProjectInfoCacheImpl {
project2ep: ClashMap::new(),
account2ep: ClashMap::new(),
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
active_listeners_lock: Mutex::new(0),
}
}
@@ -203,19 +158,17 @@ impl ProjectInfoCacheImpl {
endpoint_id: &EndpointId,
role_name: &RoleName,
) -> Option<RoleAccessControl> {
let valid_since = self.get_cache_times();
let role_name = RoleNameInt::get(role_name)?;
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_role_secret(role_name, valid_since)
endpoint_info.get_role_secret(role_name)
}
pub(crate) fn get_endpoint_access(
&self,
endpoint_id: &EndpointId,
) -> Option<EndpointAccessControl> {
let valid_since = self.get_cache_times();
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_controls(valid_since)
endpoint_info.get_controls()
}
pub(crate) fn insert_endpoint_access(
@@ -237,8 +190,8 @@ impl ProjectInfoCacheImpl {
return;
}
let controls = Entry::from(controls);
let role_controls = Entry::from(role_controls);
let controls = Entry::new(controls, self.config.ttl);
let role_controls = Entry::new(role_controls, self.config.ttl);
match self.cache.entry(endpoint_id) {
clashmap::Entry::Vacant(e) => {
@@ -275,27 +228,6 @@ impl ProjectInfoCacheImpl {
}
}
fn ignore_ttl_since(&self) -> Option<Instant> {
let ttl_disabled_since_us = self
.ttl_disabled_since_us
.load(std::sync::atomic::Ordering::Relaxed);
if ttl_disabled_since_us == u64::MAX {
return None;
}
Some(self.start_time + Duration::from_micros(ttl_disabled_since_us))
}
fn get_cache_times(&self) -> Instant {
let mut valid_since = Instant::now() - self.config.ttl;
if let Some(ignore_ttl_since) = self.ignore_ttl_since() {
// We are fine if entry is not older than ttl or was added before we are getting notifications.
valid_since = valid_since.min(ignore_ttl_since);
}
valid_since
}
pub fn maybe_invalidate_role_secret(&self, endpoint_id: &EndpointId, role_name: &RoleName) {
let Some(endpoint_id) = EndpointIdInt::get(endpoint_id) else {
return;
@@ -313,16 +245,7 @@ impl ProjectInfoCacheImpl {
return;
};
let created_at = role_controls.get().created_at;
let expire = match self.ignore_ttl_since() {
// if ignoring TTL, we should still try and roll the password if it's old
// and we the client gave an incorrect password. There could be some lag on the redis channel.
Some(_) => created_at + self.config.ttl < Instant::now(),
// edge case: redis is down, let's be generous and invalidate the cache immediately.
None => true,
};
if expire {
if role_controls.get().expires_at <= Instant::now() {
role_controls.remove();
}
}

View File

@@ -14,8 +14,8 @@ use std::time::{Duration, Instant};
use hashlink::{LruCache, linked_hash_map::RawEntryMut};
use tracing::debug;
use super::Cache;
use super::common::Cached;
use super::{Cache, timed_lru};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
@@ -30,7 +30,7 @@ use super::{Cache, timed_lru};
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::Cached`] for more information.
/// See [`Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
@@ -217,15 +217,18 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper.
pub(crate) fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
/// Retrieve a cached entry in convenient wrapper, alongside timing information.
pub(crate) fn get_with_created_at<Q>(
&self,
key: &Q,
) -> Option<Cached<&Self, (<Self as Cache>::Value, Instant)>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())),
value: entry.value.clone(),
value: (entry.value.clone(), entry.created_at),
})
}
}

View File

@@ -23,12 +23,13 @@ use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
};
use crate::metrics::Metrics;
use crate::proxy::retry::CouldRetry;
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, http, scram};
@@ -382,16 +383,31 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
})?;
if let Some(cached) = self.caches.node_info.get_with_created_at(&key) {
let (cached, (info, created_at)) = cached.take_value();
return match info {
Err(mut msg) => {
info!(key = &*key, "found cached wake_compute error");
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
// if retry_delay_ms is set, reduce it by the amount of time it spent in cache
if let Some(status) = &mut msg.status {
if let Some(retry_info) = &mut status.details.retry_info {
retry_info.retry_delay_ms = retry_info
.retry_delay_ms
.saturating_sub(created_at.elapsed().as_millis() as u64)
}
}
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
msg,
)))
}
Ok(info) => {
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
Ok(cached.map(|()| info))
}
};
}
};
}
@@ -434,42 +450,29 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
};
WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => {
let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
// If we can retry this error, do not cache it,
// unless we were given a retry delay.
if msg.could_retry() && retry_info.is_none() {
return Err(err);
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
self.caches.node_info.insert_ttl(
key,
Err(err.clone()),
Duration::from_secs(30),
);
let ttl = retry_info.map_or(Duration::from_secs(30), |r| {
Duration::from_millis(r.retry_delay_ms)
});
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)))
self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl);
Err(err)
}
err => return Err(err),
err => Err(err),
},
}
}

View File

@@ -43,28 +43,35 @@ impl UserFacingError for ControlPlaneError {
}
impl ReportableError for ControlPlaneError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
fn get_error_kind(&self) -> ErrorKind {
match self {
ControlPlaneError::Message(e) => match e.get_reason() {
Reason::RoleProtected => ErrorKind::User,
Reason::ResourceNotFound => ErrorKind::User,
Reason::ProjectNotFound => ErrorKind::User,
Reason::EndpointNotFound => ErrorKind::User,
Reason::BranchNotFound => ErrorKind::User,
Reason::RoleProtected
| Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => ErrorKind::User,
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
Reason::Unknown => ErrorKind::ControlPlane,
Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => ErrorKind::Quota,
Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken
| Reason::RunningOperations
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance
| Reason::Unknown => ErrorKind::ControlPlane,
},
ControlPlaneError::Transport(_) => crate::error::ErrorKind::ControlPlane,
ControlPlaneError::Transport(_) => ErrorKind::ControlPlane,
}
}
}
@@ -120,10 +127,10 @@ impl UserFacingError for GetAuthInfoError {
}
impl ReportableError for GetAuthInfoError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
fn get_error_kind(&self) -> ErrorKind {
match self {
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
Self::BadSecret => ErrorKind::ControlPlane,
Self::ApiError(_) => ErrorKind::ControlPlane,
}
}
}

View File

@@ -126,10 +126,16 @@ pub(crate) enum Reason {
/// or that the subject doesn't have enough permissions to access the requested endpoint.
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
/// EndpointDisabled indicates that the endpoint has been disabled and does not accept connections.
#[serde(rename = "ENDPOINT_DISABLED")]
EndpointDisabled,
/// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested branch.
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
/// InvalidEphemeralEndpointOptions indicates that the specified LSN or timestamp are wrong.
#[serde(rename = "INVALID_EPHEMERAL_OPTIONS")]
InvalidEphemeralEndpointOptions,
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
@@ -152,6 +158,9 @@ pub(crate) enum Reason {
/// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
/// RunningOperations indicates that the project already has some running operations
/// and scheduling of new ones is prohibited.
#[serde(rename = "RUNNING_OPERATIONS")]
@@ -162,9 +171,13 @@ pub(crate) enum Reason {
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
/// EndpointIdle indicates that the endpoint cannot become active, because it's idle.
#[serde(rename = "ENDPOINT_IDLE")]
EndpointIdle,
/// ProjectUnderMaintenance indicates that the project is currently ongoing maintenance,
/// and thus cannot accept connections.
#[serde(rename = "PROJECT_UNDER_MAINTENANCE")]
ProjectUnderMaintenance,
#[default]
#[serde(other)]
Unknown,
@@ -184,13 +197,15 @@ impl Reason {
pub(crate) fn can_retry(self) -> bool {
match self {
// do not retry role protected errors
// not a transitive error
// not a transient error
Reason::RoleProtected => false,
// on retry, it will still not be found
// on retry, it will still not be found or valid
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
| Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
@@ -200,11 +215,13 @@ impl Reason {
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => false,
// transitive error. control plane is currently busy
// transient error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations
| Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken => true,
| Reason::LockAlreadyTaken
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}

View File

@@ -1,12 +1,10 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::{env, io};
use chrono::{DateTime, Utc};
use opentelemetry::trace::TraceContextExt;
use serde::ser::{SerializeMap, Serializer};
use tracing::subscriber::Interest;
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -16,7 +14,9 @@ use tracing_subscriber::fmt::time::SystemTime;
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
use tracing_subscriber::registry::LookupSpan;
use crate::metrics::Metrics;
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -210,6 +210,9 @@ struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
/// tracks which fields of each **event** are duplicates
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
/// tracks callsite names to an ID.
callsite_name_ids: papaya::HashMap<&'static str, u32, ahash::RandomState>,
span_info: CallsiteMap<CallsiteSpanInfo>,
/// Fields we want to keep track of in a separate json object.
@@ -222,6 +225,7 @@ impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
clock,
skipped_field_indices: CallsiteMap::default(),
span_info: CallsiteMap::default(),
callsite_name_ids: papaya::HashMap::default(),
writer,
extract_fields,
}
@@ -232,7 +236,7 @@ impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
self.span_info
.pin()
.get_or_insert_with(metadata.callsite(), || {
CallsiteSpanInfo::new(metadata, self.extract_fields)
CallsiteSpanInfo::new(&self.callsite_name_ids, metadata, self.extract_fields)
})
.clone()
}
@@ -249,7 +253,7 @@ where
// early, before OTel machinery, and add as event extension.
let now = self.clock.now();
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
EVENT_FORMATTER.with(|f| {
let mut borrow = f.try_borrow_mut();
let formatter = match borrow.as_deref_mut() {
Ok(formatter) => formatter,
@@ -259,31 +263,19 @@ where
Err(_) => &mut EventFormatter::new(),
};
formatter.reset();
formatter.format(
now,
event,
&ctx,
&self.skipped_field_indices,
self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
});
);
// In case logging fails we generate a simpler JSON object.
if let Err(err) = res
&& let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": format_args!("cannot log event: {err:?}"),
"fields": {
"event": format_args!("{event:?}"),
},
}))
{
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
let mut writer = self.writer.make_writer();
if writer.write_all(formatter.buffer()).is_err() {
Metrics::get().proxy.logging_errors_count.inc();
}
});
}
/// Registers a SpanFields instance as span extension.
@@ -356,10 +348,11 @@ struct CallsiteSpanInfo {
}
impl CallsiteSpanInfo {
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
fn new(
callsite_name_ids: &papaya::HashMap<&'static str, u32, ahash::RandomState>,
metadata: &'static Metadata<'static>,
extract_fields: &[&'static str],
) -> Self {
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
// get all the indices of span fields we want to focus
@@ -372,8 +365,18 @@ impl CallsiteSpanInfo {
// normalized_name is unique for each callsite, but it is not
// unified across separate proxy instances.
// todo: can we do better here?
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
let normalized_name = format!("{}#{cid}", metadata.name()).into();
let cid = *callsite_name_ids
.pin()
.update_or_insert(metadata.name(), |&cid| cid + 1, 0);
// we hope that most span names are unique, in which case this will always be 0
let normalized_name = if cid == 0 {
metadata.name().into()
} else {
// if the span name is not unique, add the numeric ID to span name to distinguish it.
// sadly this is non-determinstic, across restarts but we should fix it by disambiguating re-used span names instead.
format!("{}#{cid}", metadata.name()).into()
};
Self {
extract,
@@ -382,9 +385,24 @@ impl CallsiteSpanInfo {
}
}
#[derive(Clone)]
struct RawValue(Box<[u8]>);
impl RawValue {
fn new(v: impl json::ValueEncoder) -> Self {
Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
}
}
impl json::ValueEncoder for &RawValue {
fn encode(self, v: json::ValueSer<'_>) {
v.write_raw_json(&self.0);
}
}
/// Stores span field values recorded during the spans lifetime.
struct SpanFields {
values: [serde_json::Value; MAX_TRACING_FIELDS],
values: [Option<RawValue>; MAX_TRACING_FIELDS],
/// cached span info so we can avoid extra hashmap lookups in the hot path.
span_info: CallsiteSpanInfo,
@@ -394,7 +412,7 @@ impl SpanFields {
fn new(span_info: CallsiteSpanInfo) -> Self {
Self {
span_info,
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
values: [const { None }; MAX_TRACING_FIELDS],
}
}
}
@@ -402,55 +420,55 @@ impl SpanFields {
impl tracing::field::Visit for SpanFields {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
} else {
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
} else {
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.values[field.index()] = serde_json::Value::from(value);
self.values[field.index()] = Some(RawValue::new(value));
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
}
#[inline]
@@ -459,7 +477,7 @@ impl tracing::field::Visit for SpanFields {
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
}
}
@@ -508,11 +526,6 @@ impl EventFormatter {
&self.logline_buffer
}
#[inline]
fn reset(&mut self) {
self.logline_buffer.clear();
}
fn format<S>(
&mut self,
now: DateTime<Utc>,
@@ -520,8 +533,7 @@ impl EventFormatter {
ctx: &Context<'_, S>,
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
extract_fields: &'static [&'static str],
) -> io::Result<()>
where
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
@@ -536,78 +548,99 @@ impl EventFormatter {
.copied()
.unwrap_or_default();
let mut serialize = || {
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
let mut serializer = serializer.serialize_map(None)?;
self.logline_buffer.clear();
let serializer = json::ValueSer::new(&mut self.logline_buffer);
json::value_as_object!(|serializer| {
// Timestamp comes first, so raw lines can be sorted by timestamp.
serializer.serialize_entry("timestamp", &timestamp)?;
serializer.entry("timestamp", &*timestamp);
// Level next.
serializer.serialize_entry("level", &meta.level().as_str())?;
serializer.entry("level", meta.level().as_str());
// Message next.
serializer.serialize_key("message")?;
let mut message_extractor =
MessageFieldExtractor::new(serializer, skipped_field_indices);
MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
event.record(&mut message_extractor);
let mut serializer = message_extractor.into_serializer()?;
message_extractor.finish();
// Direct message fields.
let mut fields_present = FieldsPresent(false, skipped_field_indices);
event.record(&mut fields_present);
if fields_present.0 {
serializer.serialize_entry(
"fields",
&SerializableEventFields(event, skipped_field_indices),
)?;
{
let mut message_skipper = MessageFieldSkipper::new(
serializer.key("fields").object(),
skipped_field_indices,
);
event.record(&mut message_skipper);
// rollback if no fields are present.
if message_skipper.present {
message_skipper.serializer.finish();
}
}
let spans = SerializableSpans {
// collect all spans from parent to root.
spans: ctx
let mut extracted = ExtractedSpanFields::new(extract_fields);
let spans = serializer.key("spans");
json::value_as_object!(|spans| {
let parent_spans = ctx
.event_span(event)
.map_or(vec![], |parent| parent.scope().collect()),
extracted: ExtractedSpanFields::new(extract_fields),
};
serializer.serialize_entry("spans", &spans)?;
.map_or(vec![], |parent| parent.scope().collect());
for span in parent_spans.iter().rev() {
let ext = span.extensions();
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
let span_fields = spans.key(&*span_info.normalized_name);
json::value_as_object!(|span_fields| {
for (field, value) in std::iter::zip(span.metadata().fields(), values) {
if let Some(value) = value {
span_fields.entry(field.name(), value);
}
}
});
}
});
// TODO: thread-local cache?
let pid = std::process::id();
// Skip adding pid 1 to reduce noise for services running in containers.
if pid != 1 {
serializer.serialize_entry("process_id", &pid)?;
serializer.entry("process_id", pid);
}
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
// TODO: tls cache? name could change
if let Some(thread_name) = std::thread::current().name()
&& !thread_name.is_empty()
&& thread_name != "tokio-runtime-worker"
{
serializer.serialize_entry("thread_name", thread_name)?;
serializer.entry("thread_name", thread_name);
}
if let Some(task_id) = tokio::task::try_id() {
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
serializer.entry("task_id", format_args!("{task_id}"));
}
serializer.serialize_entry("target", meta.target())?;
serializer.entry("target", meta.target());
// Skip adding module if it's the same as target.
if let Some(module) = meta.module_path()
&& module != meta.target()
{
serializer.serialize_entry("module", module)?;
serializer.entry("module", module);
}
if let Some(file) = meta.file() {
if let Some(line) = meta.line() {
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
serializer.entry("src", format_args!("{file}:{line}"));
} else {
serializer.serialize_entry("src", file)?;
serializer.entry("src", file);
}
}
@@ -616,124 +649,104 @@ impl EventFormatter {
let otel_spanref = otel_context.span();
let span_context = otel_spanref.span_context();
if span_context.is_valid() {
serializer.serialize_entry(
"trace_id",
&format_args!("{}", span_context.trace_id()),
)?;
serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
}
}
if spans.extracted.has_values() {
if extracted.has_values() {
// TODO: add fields from event, too?
serializer.serialize_entry("extract", &spans.extracted)?;
let extract = serializer.key("extract");
json::value_as_object!(|extract| {
for (key, value) in std::iter::zip(extracted.names, extracted.values) {
if let Some(value) = value {
extract.entry(*key, &value);
}
}
});
}
});
serializer.end()
};
serialize().map_err(io::Error::other)?;
self.logline_buffer.push(b'\n');
Ok(())
}
}
/// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
serializer: S,
struct MessageFieldExtractor<'buf> {
serializer: Option<json::ValueSer<'buf>>,
skipped_field_indices: SkippedFieldIndices,
state: Option<Result<(), S::Error>>,
}
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
impl<'buf> MessageFieldExtractor<'buf> {
#[inline]
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
serializer: Some(serializer),
skipped_field_indices,
state: None,
}
}
#[inline]
fn into_serializer(mut self) -> Result<S, S::Error> {
match self.state {
Some(Ok(())) => {}
Some(Err(err)) => return Err(err),
None => self.serializer.serialize_value("")?,
fn finish(self) {
if let Some(ser) = self.serializer {
ser.value("");
}
Ok(self.serializer)
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_none()
&& field.name() == MESSAGE_FIELD
fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
if field.name() == MESSAGE_FIELD
&& !self.skipped_field_indices.contains(field.index())
&& let Some(ser) = self.serializer.take()
{
ser.value(v);
}
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
impl tracing::field::Visit for MessageFieldExtractor<'_> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
}
self.record_field(field, format_args!("{value:x?}"));
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
self.record_field(field, value);
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
}
self.record_field(field, format_args!("{value:?}"));
}
#[inline]
@@ -742,147 +755,83 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
}
}
}
/// Checks if there's any fields and field values present. If not, the JSON subobject
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent(pub bool, SkippedFieldIndices);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.contains(field.index())
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
self.0 |= true;
}
}
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeMap;
let serializer = serializer.serialize_map(None)?;
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
self.0.record(&mut message_skipper);
let serializer = message_skipper.into_serializer()?;
serializer.end()
self.record_field(field, format_args!("{value}"));
}
}
/// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
serializer: S,
struct MessageFieldSkipper<'buf> {
serializer: json::ObjectSer<'buf>,
skipped_field_indices: SkippedFieldIndices,
state: Result<(), S::Error>,
present: bool,
}
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
impl<'buf> MessageFieldSkipper<'buf> {
#[inline]
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
skipped_field_indices,
state: Ok(()),
present: false,
}
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_ok()
&& field.name() != MESSAGE_FIELD
fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
if field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
&& !self.skipped_field_indices.contains(field.index())
}
#[inline]
fn into_serializer(self) -> Result<S, S::Error> {
self.state?;
Ok(self.serializer)
{
self.serializer.entry(field.name(), v);
self.present |= true;
}
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
impl tracing::field::Visit for MessageFieldSkipper<'_> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:x?}"));
}
self.record_field(field, format_args!("{value:x?}"));
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
self.record_field(field, value);
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:?}"));
}
self.record_field(field, format_args!("{value:?}"));
}
#[inline]
@@ -891,131 +840,40 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = self.serializer.serialize_value(&format_args!("{value}"));
}
}
}
/// Serializes the span stack from root to leaf (parent of event) as object
/// with the span names as keys. To prevent collision we append a numberic value
/// to the name. Also, collects any span fields we're interested in. Last one
/// wins.
struct SerializableSpans<'ctx, S>
where
S: for<'lookup> LookupSpan<'lookup>,
{
spans: Vec<SpanRef<'ctx, S>>,
extracted: ExtractedSpanFields,
}
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
where
S: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
for span in self.spans.iter().rev() {
let ext = span.extensions();
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
self.extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
serializer.serialize_entry(
&*span_info.normalized_name,
&SerializableSpanFields {
fields: span.metadata().fields(),
values,
},
)?;
}
serializer.end()
}
}
/// Serializes the span fields as object.
struct SerializableSpanFields<'span> {
fields: &'span tracing::field::FieldSet,
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
}
impl serde::ser::Serialize for SerializableSpanFields<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
for (field, value) in std::iter::zip(self.fields, self.values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(field.name(), value)?;
}
serializer.end()
self.record_field(field, format_args!("{value}"));
}
}
struct ExtractedSpanFields {
names: &'static [&'static str],
values: RefCell<Vec<serde_json::Value>>,
values: Vec<Option<RawValue>>,
}
impl ExtractedSpanFields {
fn new(names: &'static [&'static str]) -> Self {
ExtractedSpanFields {
names,
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
values: vec![None; names.len()],
}
}
fn layer_span(&self, fields: &SpanFields) {
let mut v = self.values.borrow_mut();
fn layer_span(&mut self, fields: &SpanFields) {
let SpanFields { values, span_info } = fields;
// extract the fields
for (i, &j) in span_info.extract.iter().enumerate() {
let Some(value) = values.get(j) else { continue };
let Some(Some(value)) = values.get(j) else {
continue;
};
if !value.is_null() {
// TODO: replace clone with reference, if possible.
v[i] = value.clone();
}
// TODO: replace clone with reference, if possible.
self.values[i] = Some(value.clone());
}
}
#[inline]
fn has_values(&self) -> bool {
self.values.borrow().iter().any(|v| !v.is_null())
}
}
impl serde::ser::Serialize for ExtractedSpanFields {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let values = self.values.borrow();
for (key, value) in std::iter::zip(self.names, &*values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(key, value)?;
}
serializer.end()
self.values.iter().any(|v| v.is_some())
}
}
@@ -1070,6 +928,7 @@ mod tests {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
span_info: papaya::HashMap::default(),
callsite_name_ids: papaya::HashMap::default(),
writer: buffer.clone(),
extract_fields: &["x"],
};
@@ -1078,14 +937,16 @@ mod tests {
tracing::subscriber::with_default(registry, || {
info_span!("some_span", x = 24).in_scope(|| {
info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
tracing::error!(
a = 1,
a = 2,
a = 3,
message = "explicit message field",
"implicit message field"
);
info_span!("some_other_span", y = 30).in_scope(|| {
info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
tracing::error!(
a = 1,
a = 2,
a = 3,
message = "explicit message field",
"implicit message field"
);
});
});
});
});
@@ -1104,12 +965,15 @@ mod tests {
"a": 3,
},
"spans": {
"some_span#1":{
"some_span":{
"x": 24,
},
"some_span#2": {
"some_other_span": {
"y": 30,
},
"some_span#1": {
"x": 42,
}
},
},
"extract": {
"x": 42,

View File

@@ -112,6 +112,9 @@ pub struct ProxyMetrics {
/// Number of bytes sent/received between all clients and backends.
pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
/// Number of IO errors while logging.
pub logging_errors_count: Counter,
/// Number of errors by a given classification.
pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,

View File

@@ -110,7 +110,7 @@ where
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// If we just received this from cplane and not from the cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(

View File

@@ -195,15 +195,18 @@ impl NeonOptions {
// proxy options:
/// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute.
pub const PARAMS_COMPAT: &str = "proxy_params_compat";
pub const PARAMS_COMPAT: &'static str = "proxy_params_compat";
// cplane options:
/// `LSN` allows provisioning an ephemeral compute with time-travel to the provided LSN.
const LSN: &str = "lsn";
const LSN: &'static str = "lsn";
/// `TIMESTAMP` allows provisioning an ephemeral compute with time-travel to the provided timestamp.
const TIMESTAMP: &'static str = "timestamp";
/// `ENDPOINT_TYPE` allows configuring an ephemeral compute to be read_only or read_write.
const ENDPOINT_TYPE: &str = "endpoint_type";
const ENDPOINT_TYPE: &'static str = "endpoint_type";
pub(crate) fn parse_params(params: &StartupMessageParams) -> Self {
params
@@ -228,6 +231,7 @@ impl NeonOptions {
// This is not a cplane option, we know it does not create ephemeral computes.
Self::PARAMS_COMPAT => false,
Self::LSN => true,
Self::TIMESTAMP => true,
Self::ENDPOINT_TYPE => true,
// err on the side of caution. any cplane options we don't know about
// might lead to ephemeral computes.

View File

@@ -265,10 +265,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
return Ok(());
}
let mut conn = match try_connect(&redis).await {
Ok(conn) => {
handler.cache.increment_active_listeners().await;
conn
}
Ok(conn) => conn,
Err(e) => {
tracing::error!(
"failed to connect to redis: {e}, will try to reconnect in {RECONNECT_TIMEOUT:#?}"
@@ -287,11 +284,9 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
}
}
if cancellation_token.is_cancelled() {
handler.cache.decrement_active_listeners().await;
return Ok(());
}
}
handler.cache.decrement_active_listeners().await;
}
}

View File

@@ -32,7 +32,7 @@ psutil = "^5.9.4"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
aiohttp = "3.10.11"
aiohttp = "3.12.14"
pytest-rerunfailures = "^15.0"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"

View File

@@ -58,6 +58,7 @@ metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
@@ -71,6 +72,7 @@ http-utils.workspace = true
utils.workspace = true
wal_decoder.workspace = true
env_logger.workspace = true
nix.workspace = true
workspace_hack.workspace = true

View File

@@ -17,8 +17,9 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT,
DEFAULT_GLOBAL_DISK_CHECK_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
@@ -42,6 +43,12 @@ use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version, tcp_listener};
use safekeeper::hadron::{
GLOBAL_DISK_LIMIT_EXCEEDED, get_filesystem_capacity, get_filesystem_usage,
};
use safekeeper::metrics::GLOBAL_DISK_UTIL_CHECK_SECONDS;
use std::sync::atomic::Ordering;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@@ -256,6 +263,15 @@ struct Args {
/* BEGIN_HADRON */
#[arg(long)]
enable_pull_timeline_on_startup: bool,
/// How often to scan entire data-dir for total disk usage
#[arg(long, value_parser=humantime::parse_duration, default_value = DEFAULT_GLOBAL_DISK_CHECK_INTERVAL)]
global_disk_check_interval: Duration,
/// The portion of the filesystem capacity that can be used by all timelines.
/// A circuit breaker will trip and reject all WAL writes if the total usage
/// exceeds this ratio.
/// Set to 0 to disable the global disk usage limit.
#[arg(long, default_value_t = DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO)]
max_global_disk_usage_ratio: f64,
/* END_HADRON */
}
@@ -444,6 +460,8 @@ async fn main() -> anyhow::Result<()> {
advertise_pg_addr_tenant_only: None,
enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup,
hcc_base_url: None,
global_disk_check_interval: args.global_disk_check_interval,
max_global_disk_usage_ratio: args.max_global_disk_usage_ratio,
/* END_HADRON */
});
@@ -618,6 +636,49 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
/* BEGIN_HADRON */
// Spawn global disk usage watcher task, if a global disk usage limit is specified.
let interval = conf.global_disk_check_interval;
let data_dir = conf.workdir.clone();
// Use the safekeeper data directory to compute filesystem capacity. This only runs once on startup, so
// there is little point to continue if we can't have the proper protections in place.
let fs_capacity_bytes = get_filesystem_capacity(data_dir.as_std_path())
.expect("Failed to get filesystem capacity for data directory");
let limit: u64 = (conf.max_global_disk_usage_ratio * fs_capacity_bytes as f64) as u64;
if limit > 0 {
let disk_usage_watch_handle = BACKGROUND_RUNTIME
.handle()
.spawn(async move {
// Use Tokio interval to preserve fixed cadence between filesystem utilization checks
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let data_dir_clone = data_dir.clone();
let check_start = Instant::now();
let usage = tokio::task::spawn_blocking(move || {
get_filesystem_usage(data_dir_clone.as_std_path())
})
.await
.unwrap_or(0);
let elapsed = check_start.elapsed().as_secs_f64();
GLOBAL_DISK_UTIL_CHECK_SECONDS.observe(elapsed);
if usage > limit {
warn!(
"Global disk usage exceeded limit. Usage: {} bytes, limit: {} bytes",
usage, limit
);
}
GLOBAL_DISK_LIMIT_EXCEEDED.store(usage > limit, Ordering::Relaxed);
}
})
.map(|res| ("Global disk usage watcher".to_string(), res));
tasks_handles.push(Box::pin(disk_usage_watch_handle));
}
/* END_HADRON */
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let wal_service_handle = current_thread_rt
.as_ref()

Some files were not shown because too many files have changed in this diff Show More