Compare commits

...

36 Commits

Author SHA1 Message Date
Conrad Ludgate
e0f70a97cc use cargo-chef for compute-tools 2025-06-17 17:29:28 +01:00
Konstantin Knizhnik
dfa055f4be Support event trigger for Neon users (#10624)
## Problem

https://github.com/neondatabase/neon/issues/7570

Even triggers are supported only for superusers.

## Summary of changes

Temporary switch to superuser when even trigger is created and disable
execution of user's even triggers under superuser.

---------

Co-authored-by: Dimitri Fontaine <dim@tapoueh.org>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-17 15:44:50 +00:00
Erik Grinaker
a4c76740c0 pageserver: emit gRPC GetPage errors as responses (#12255)
## Problem

When converting `proto::GetPageRequest` into `page_api::GetPageRequest`
and validating the request, errors are returned as `tonic::Status`. This
will tear down the GetPage stream, which is disruptive and unnecessary.

## Summary of changes

Emit invalid request errors as `GetPageResponse` with an appropriate
`status_code` instead.

Also move the conversion from `tonic::Status` to `GetPageResponse` out
into the stream handler.
2025-06-17 15:41:17 +00:00
Dmitrii Kovalkov
f2e96b2323 tests: prepare test_compatibility.py for --timelines-onto-safekeepers (#12204)
## Problem
Compatibility tests may be run against a compatibility snapshot
generated with --timelines-onto-safekeepers=false. We need to start the
compute without a generation (or with 0 generation) if the timeline is
not storcon-managed, otherwise the compute will hang.

- Follow up on https://github.com/neondatabase/neon/pull/12203
- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Handle compatibility snapshot generated with no
`--timelines-onot-safekeepers` properly
2025-06-17 15:16:07 +00:00
Dmitrii Kovalkov
dee73f0cb4 pageserver: implement max_total_size_bytes limit for basebackup cache (#12230)
## Problem
The cache was introduced as a hackathon project and the only supported
limit was the number of entries.
The basebackup entry size may vary. We need to have more control over
disk space usage to ship it to production.

- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Store the size of entries in the cache and use it to limit
`max_total_size_bytes`
- Add the size of the cache in bytes to metrics.
2025-06-17 15:08:59 +00:00
Erik Grinaker
edf51688bc neon_local: support gRPC connstrings for endpoints (#12271)
## Problem

`neon_local` should support endpoints using gRPC, by providing `grpc://`
connstrings with the Pageservers' gRPC ports.

Requires #12268.
Touches #11926.

## Summary of changes

* Add `--grpc` switch for `neon_local endpoint create`.
* Generate `grpc://` connstrings for endpoints when enabled.

Computes don't actually support `grpc://` connstrings yet, but will
soon.

gRPC is configured when the endpoint is created, not when it's started,
such that it continues to use gRPC across restarts and reconfigurations.
In particular, this is necessary for the storage controller's local
notify hook, which can't easily plumb through gRPC configuration from
the start/reconfigure commands but has access to the endpoint's
configuration.
2025-06-17 14:39:42 +00:00
Aleksandr Sarantsev
4a8f3508f9 storcon: Add safekeeper request label group (#12239)
## Problem

The metrics `storage_controller_safekeeper_request_error` and
`storage_controller_safekeeper_request_latency` currently use
`pageserver_id` as a label.
This can be misleading, as the metrics are about safekeeper requests.  
We want to replace this with a more accurate label — either
`safekeeper_id` or `node_id`.

## Summary of changes

- Introduced `SafekeeperRequestLabelGroup` with `safekeeper_id`.
- Updated the affected metrics to use the new label group.
- Fixed incorrect metric usage in safekeeper_client.rs

## Follow-up

- Review usage of these metrics in alerting rules and existing Grafana
dashboards to ensure this change does not break something.
2025-06-17 13:33:01 +00:00
Erik Grinaker
48052477b4 storcon: register Pageserver gRPC address (#12268)
## Problem

Pageservers now expose a gRPC API on a separate address and port. This
must be registered with the storage controller such that it can be
plumbed through to the compute via cplane.

Touches #11926.

## Summary of changes

This patch registers the gRPC address and port with the storage
controller:

* Add gRPC address to `nodes` database table and `NodePersistence`, with
a Diesel migration.
* Add gRPC address in `NodeMetadata`, `NodeRegisterRequest`,
`NodeDescribeResponse`, and `TenantLocateResponseShard`.
* Add gRPC address flags to `storcon_cli node-register`.

These changes are backwards-compatible, since all structs will ignore
unknown fields during deserialization.
2025-06-17 13:27:10 +00:00
Erik Grinaker
d81353b2d1 pageserver: gRPC base backup fixes (#12243)
## Problem

The gRPC base backup implementation has a few issues: chunks are not
properly bounded, and it's not possible to omit the LSN.

Touches #11728.

## Summary of changes

* Properly bound chunks by using a limited writer.
* Use an `Option<Lsn>` rather than a `ReadLsn` (the latter requires an
LSN).
2025-06-17 12:37:43 +00:00
Aleksandr Sarantsev
143500dc4f storcon: Improve stably_attached readability (#12249)
## Problem

The `stably_attached` function is hard to read due to deeply nested
conditionals

## Summary of Changes

- Refactored `stably_attached` to use early returns and the `?` operator
for improved readability
2025-06-17 10:10:10 +00:00
Aleksandr Sarantsev
1a5f7ce6ad storcon: Exclude another secondaries while optimizing secondary (#12251)
## Problem

If the node intent includes more than one secondary, we can generate a
replace optimization using a candidate node that is already a secondary
location.

## Summary of changes

- Exclude all other secondary nodes from the scoring process to ensure
optimal candidate selection.
2025-06-17 10:09:55 +00:00
Alexander Lakhin
01ccb34118 Don't rerun failed tests in 'Build and Test with Sanitizers' workflow (#12259)
## Problem

We could easily miss a sanitizer-detected defect, if it occurred due to
some race condition, as we just rerun the test and if it succeeds, the
overall test run is considered successful. It was more reasonable
before, when we had much more unstable tests in main, but now we can
track all test failures.

## Summary of changes
Don't rerun failed tests.
2025-06-17 08:08:43 +00:00
Tristan Partin
f669e18477 Remove TODO comment related to default_transaction_read_only (#12261)
This code has been deployed for a while, so let's remove the TODO, and
remove the option passed from the control plane.

Link: https://github.com/neondatabase/cloud/pull/30274

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-16 19:38:26 +00:00
Suhas Thalanki
632cde7f13 schema and github workflow for validation of compute manifest (#12069)
Adds a schema to validate the manifest.yaml described in [this
RFC](https://github.com/neondatabase/neon/blob/main/docs/rfcs/038-independent-compute-release.md)
and a github workflow to test this.
2025-06-16 19:30:41 +00:00
Alexander Lakhin
118e13438d Add "Build and Test Fully" workflow (#11931)
## Problem

We don't test debug builds for v14..v16 in the regular "Build and Test"
runs to perform the testing faster, but it means we can't detect
assertion failures in those versions.
(See https://github.com/neondatabase/neon/issues/11891,
https://github.com/neondatabase/neon/issues/11997)

## Summary of changes
Add a new workflow to test all build types and all versions on all
architectures.
2025-06-16 13:29:39 +00:00
Trung Dinh
fc136eec8f pagectl: add dump layer local (#12245)
## Problem
In our environment, we don't always have access to the pagectl tool on
the pageserver. We have to download the page files to local env to
introspect them. Hence, it'll be useful to be able to parse the local
files using `pagectl`.

## Summary of changes
* Add `dump-layer-local` to `pagectl` that takes a local path as
argument and returns the layer content:
```
cargo  run -p pagectl layer dump-layer-local ~/Desktop/000000067F000040490002800000FFFFFFFF-030000000000000000000000000000000002__00003E7A53EDE611-00003E7AF27BFD19-v1-00000001
```

* Bonus: Fix a bug in `pageserver/ctl/src/draw_timeline_dir.rs` in which
we don't filter out temporary files.
2025-06-16 10:29:42 +00:00
Erik Grinaker
818e5130f1 page_api: add a few derives (#12253)
## Problem

The `page_api` domain types are missing a few derives.

## Summary of changes

Add `Clone`, `Copy`, and `Debug` derives for all types where
appropriate.
2025-06-16 09:45:50 +00:00
Alexander Sarantcev
c243521ae5 Fix reconcile_long_running metric comment (#12234)
## Problem

Comment for `storage_controller_reconcile_long_running` metric was
copy-pasted and not updated in #9207

## Summary of changes

- Fixed comment
2025-06-16 05:51:57 +00:00
Alexander Sarantcev
5303c71589 Move comment above metrics handler (#12236)
## Problem

Comment is in incorrect place: `/metrics` code is above its description
comment.

## Summary of changes

- `/metrics` code is now below the comment
2025-06-13 18:18:51 +00:00
Alexander Sarantcev
d146897415 Fix reconciles metrics typo (#12235)
## Problem

Need to fix naming `safkeeper` -> `safekeeper`

## Summary of changes

- `storage_controller_safkeeper_reconciles_*` renamed to
`storage_controller_safekeeper_reconciles_*`
2025-06-13 17:47:09 +00:00
Alexander Sarantcev
d63815fa40 Fix ChaosInjector shard eligibility bug (#12231)
## Problem

ChaosInjector is intended to skip non-active scheduling policies, but
the current logic skips active shards instead.

## Summary of changes

- Fixed shard eligibility condition to correctly allow chaos injection
for shards with an Active scheduling policy.
2025-06-13 13:34:29 +00:00
Dmitrii Kovalkov
385324ee8a pageserver: fix post-merge PR comments on basebackup cache (#12216)
## Problem
This PR addresses all but the direct IO post-merge comments on
basebackup cache implementation.

- Follow up on
https://github.com/neondatabase/neon/pull/11989#pullrequestreview-2867966119
- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Clean up the tmp directory by recreating it.
- Recreate the tmp directory on startup.
- Add comments why it's safe to not fsync the inode after renaming.
2025-06-13 08:49:31 +00:00
Alex Chi Z.
8a68d463f6 feat(pagectl): no max key limit if time travel recover locally (#12222)
## Problem

We would easily hit this limit for a tenant running for enough long
time.

## Summary of changes

Remove the max key limit for time-travel recovery if the command is
running locally.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-13 08:41:10 +00:00
Alex Chi Z.
3046c307da feat(posthog_client): support feature flag secure API (#12201)
## Problem

Part of #11813 

PostHog has two endpoints to retrieve feature flags: the old project ID
one that uses personal API token, and the new one using a special
feature flag secure token that can only retrieve feature flag. The new
API I added in this patch is not documented in the PostHog API doc but
it's used in their Python SDK.

## Summary of changes

Add support for "feature flag secure token API". The API has no way of
providing a project ID so we verify if the retrieved spec is consistent
with the project ID specified by comparing the `team_id` field.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-13 07:22:02 +00:00
Dmitrii Kovalkov
e83f1d8ba5 tests: prepare test_historic_storage_formats for --timelines-onto-safekeepers (#12214)
## Problem
`test_historic_storage_formats` uses `/tenant_import` to import historic
data. Tenant import does not create timelines onto safekeepers, because
they might already exist on some safekeeper set. If it does, then we may
end up with two different quorums accepting WAL for the same timeline.

If the tenant import is used in a real deployment, the administrator is
responsible for looking for the proper safekeeper set and migrate
timelines into storcon-managed timelines.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Create timelines onto safekeepers manually after tenant import in
`test_historic_storage_formats`
- Add a note to tenant import that timelines will be not storcon-managed
after the import.
2025-06-13 06:28:18 +00:00
Trung Dinh
8917676e86 Improve logging for gc-compaction (#12219)
## Problem
* Inside `compact_with_gc_inner`, there is a similar log line:

db24ba95d1/pageserver/src/tenant/timeline/compaction.rs (L3181-L3187)

* Also, I think it would be useful when debugging to have the ability to
select a particular sub-compaction job (e.g., `1/100`) to see all the
logs for that job.

## Summary of changes
* Attach a span to the `compact_with_gc_inner`. 

CC: @skyzh
2025-06-13 06:07:18 +00:00
Ivan Efremov
43acabd4c2 [proxy]: Improve backoff strategy for redis reconnection (#12218)
Sometimes during a failed redis connection attempt at the init stage
proxy pod can continuously restart. This, in turn, can aggravate the
problem if redis is overloaded.

Solves the #11114
2025-06-12 19:46:02 +00:00
Vlad Lazar
db24ba95d1 pagserver: always persist shard identity (#12217)
## Problem

The location config (which includes the stripe size) is stored on
pageserver disk.
For unsharded tenants we [do not include the shard identity in the
serialized
description](ad88ec9257/pageserver/src/tenant/config.rs (L64-L66)).
When the pageserver restarts, it reads that configuration and will use
the stripe size from there
and rely on storcon input from reattach for generation and mode.

The default deserialization is ShardIdentity::unsharded. This has the
new default stripe size of 2048.
Hence, for unsharded tenants we can be running with a stripe size
different from that the one in the
storcon observed state. This is not a problem until we shard split
without specifying a stripe size (i.e. manual splits via the UI or
storcon_cli). When that happens the new shards will use the 2048 stripe
size until storcon realises and switches them back. At that point it's
too late, since we've ingested data with the wrong stripe sizes.

## Summary of changes

Ideally, we would always have the full shard identity on disk. To
achieve this over two releases we do:
1. Always persist the shard identity in the location config on the PS.
2. Storage controller includes the stripe size to use in the re attach
response.

After the first release, we will start persisting correct stripe sizes
for any tenant shard that the storage controller
explicitly sends a location_conf. After the second release, the
re-attach change kicks in and we'll persist the
shard identity for all shards.
2025-06-12 17:15:02 +00:00
Folke Behrens
1dce65308d Update base64 to 0.22 (#12215)
## Problem

Base64 0.13 is outdated.

## Summary of changes

Update base64 to 0.22. Affects mostly proxy and proxy libs. Also upgrade
serde_with to remove another dep on base64 0.13 from dep tree.
2025-06-12 16:12:47 +00:00
Alex Chi Z.
ad88ec9257 fix(pageserver): extend layer manager read guard threshold (#12211)
## Problem

Follow up of https://github.com/neondatabase/neon/pull/12194 to make the
benchmarks run without warnings.

## Summary of changes

Extend read guard hold timeout to 30s.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-12 08:39:54 +00:00
Dmitrii Kovalkov
60dfdf39c7 tests: prepare test_tenant_delete_stale_shards for --timelines-onto-safekeepers (#12198)
## Problem
The test creates an endpoint and deletes its tenant. The compute cannot
stop gracefully because it tries to write a checkpoint shutdown record
into the WAL, but the timeline had been already deleted from
safekeepers.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
Stop the compute before deleting a tenant
2025-06-12 08:10:22 +00:00
Dmitrii Kovalkov
3d5e2bf685 storcon: add tenant_timeline_locate handler (#12203)
## Problem
Compatibility tests may be run against a compatibility snapshot
generated with `--timelines-onto-safekeepers=false`. We need to start
the compute without a generation (or with 0 generation) if the timeline
is not storcon-managed, otherwise the compute will hang.

This handler is needed to check if the timeline is storcon-managed.
It's also needed for better test coverage of safekeeper migration code.

- Relates to https://github.com/neondatabase/neon/pull/11712

## Summary of changes
- Implement `tenant_timeline_locate` handler in storcon to get
safekeeper info from storcon's DB
2025-06-12 08:09:57 +00:00
dependabot[bot]
54fdcfdfa8 build(deps): bump requests from 2.32.3 to 2.32.4 in the pip group across 1 directory (#12180)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-11 21:09:05 +00:00
Vlad Lazar
28e882a80f pageserver: warn on long layer manager locking intervals (#12194)
## Problem

We hold the layer map for too long on occasion.

## Summary of changes

This should help us identify the places where it's happening from.

Related https://github.com/neondatabase/neon/issues/12182
2025-06-11 16:16:30 +00:00
Konstantin Knizhnik
24038033bf Remove default from DROP FUNCTION (#12202)
## Problem

DROP FUNCTION doesn't allow to specify default for parameters.

## Summary of changes

Remove DEFAULT clause from pgxn/neon/neon--1.6--1.5.sql

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-11 13:16:58 +00:00
Mikhail
1b935b1958 endpoint_storage: add ?from_endpoint= to /lfc/prewarm (#12195)
Related: https://github.com/neondatabase/cloud/issues/24225
Add optional from_endpoint parameter to allow prewarming from other
endpoint
2025-06-10 19:25:32 +00:00
103 changed files with 2788 additions and 695 deletions

View File

@@ -38,6 +38,11 @@ on:
required: false
default: 1
type: number
rerun-failed:
description: 'rerun failed tests to ignore flaky tests'
required: false
default: true
type: boolean
defaults:
run:
@@ -379,7 +384,7 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
@@ -387,14 +392,14 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.test-run-count == 1 }}
rerun_failed: ${{ inputs.rerun-failed }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -58,6 +58,7 @@ jobs:
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
rerun-failed: false
secrets: inherit
create-test-report:

View File

@@ -199,6 +199,28 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.

View File

@@ -0,0 +1,151 @@
name: Build and Test Fully
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 3 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ debug, release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v14", "lfc_state": "with-lfc"},
{"pg_version":"v15", "lfc_state": "with-lfc"},
{"pg_version":"v16", "lfc_state": "with-lfc"},
{"pg_version":"v17", "lfc_state": "with-lfc"},
{"pg_version":"v14", "lfc_state": "without-lfc"},
{"pg_version":"v15", "lfc_state": "without-lfc"},
{"pg_version":"v16", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "withouts-lfc"}]'
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -79,6 +79,7 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v17"}]'
sanitizers: enabled
secrets: inherit

43
Cargo.lock generated
View File

@@ -753,6 +753,7 @@ dependencies = [
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
@@ -761,6 +762,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"serde_html_form",
"serde_path_to_error",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -900,12 +903,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.7"
@@ -1297,7 +1294,7 @@ dependencies = [
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.13.1",
"base64 0.22.1",
"bytes",
"camino",
"cfg-if",
@@ -1423,7 +1420,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.13.1",
"base64 0.22.1",
"camino",
"clap",
"comfy-table",
@@ -4815,7 +4812,7 @@ dependencies = [
name = "postgres-protocol2"
version = "0.1.0"
dependencies = [
"base64 0.20.0",
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
@@ -5187,7 +5184,7 @@ dependencies = [
"aws-config",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"base64 0.22.1",
"bstr",
"bytes",
"camino",
@@ -6422,6 +6419,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "serde_html_form"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.125"
@@ -6478,15 +6488,17 @@ dependencies = [
[[package]]
name = "serde_with"
version = "2.3.3"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
dependencies = [
"base64 0.13.1",
"base64 0.22.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"serde",
"serde_derive",
"serde_json",
"serde_with_macros",
"time",
@@ -6494,9 +6506,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "2.3.3"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
dependencies = [
"darling",
"proc-macro2",
@@ -8567,7 +8579,6 @@ dependencies = [
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.7",
"base64ct",
"bytes",

View File

@@ -71,8 +71,8 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
base64 = "0.13.0"
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.22"
bincode = "1.3"
bindgen = "0.71"
bit_field = "0.10.2"
@@ -171,7 +171,7 @@ sentry = { version = "0.37", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "2.0", features = [ "base64" ] }
serde_with = { version = "3", features = [ "base64" ] }
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"

3
compute/.gitignore vendored
View File

@@ -3,3 +3,6 @@ etc/neon_collector.yml
etc/neon_collector_autoscaling.yml
etc/sql_exporter.yml
etc/sql_exporter_autoscaling.yml
# Node.js dependencies
node_modules/

View File

@@ -48,3 +48,11 @@ jsonnetfmt-test:
.PHONY: jsonnetfmt-format
jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json
npm install
touch node_modules

View File

@@ -1722,11 +1722,29 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools-plan
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
WORKDIR /home/nonroot
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
COPY --from=compute-tools-plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
mold -run cargo chef cook --locked --profile release-line-debug-size-lto --recipe-path recipe.json
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \

View File

@@ -0,0 +1,209 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Neon Compute Manifest Schema",
"description": "Schema for Neon compute node configuration manifest",
"type": "object",
"properties": {
"pg_settings": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"client_connection_check_interval": {
"type": "string",
"description": "Check for client disconnection interval in milliseconds"
},
"effective_io_concurrency": {
"type": "string",
"description": "Effective IO concurrency setting"
},
"fsync": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to force fsync to disk"
},
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled"
},
"idle_in_transaction_session_timeout": {
"type": "string",
"description": "Timeout for idle transactions in milliseconds"
},
"listen_addresses": {
"type": "string",
"description": "Addresses to listen on"
},
"log_connections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log connections"
},
"log_disconnections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log disconnections"
},
"log_temp_files": {
"type": "string",
"description": "Size threshold for logging temporary files in KB"
},
"log_error_verbosity": {
"type": "string",
"enum": ["terse", "verbose", "default"],
"description": "Error logging verbosity level"
},
"log_min_error_statement": {
"type": "string",
"description": "Minimum error level for statement logging"
},
"maintenance_io_concurrency": {
"type": "string",
"description": "Maintenance IO concurrency setting"
},
"max_connections": {
"type": "string",
"description": "Maximum number of connections"
},
"max_replication_flush_lag": {
"type": "string",
"description": "Maximum replication flush lag"
},
"max_replication_slots": {
"type": "string",
"description": "Maximum number of replication slots"
},
"max_replication_write_lag": {
"type": "string",
"description": "Maximum replication write lag"
},
"max_wal_senders": {
"type": "string",
"description": "Maximum number of WAL senders"
},
"max_wal_size": {
"type": "string",
"description": "Maximum WAL size"
},
"neon.unstable_extensions": {
"type": "string",
"description": "List of unstable extensions"
},
"neon.protocol_version": {
"type": "string",
"description": "Neon protocol version"
},
"password_encryption": {
"type": "string",
"description": "Password encryption method"
},
"restart_after_crash": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to restart after crash"
},
"superuser_reserved_connections": {
"type": "string",
"description": "Number of reserved connections for superuser"
},
"synchronous_standby_names": {
"type": "string",
"description": "Names of synchronous standby servers"
},
"wal_keep_size": {
"type": "string",
"description": "WAL keep size"
},
"wal_level": {
"type": "string",
"description": "WAL level"
},
"wal_log_hints": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log hints in WAL"
},
"wal_sender_timeout": {
"type": "string",
"description": "WAL sender timeout in milliseconds"
}
},
"required": [
"client_connection_check_interval",
"effective_io_concurrency",
"fsync",
"hot_standby",
"idle_in_transaction_session_timeout",
"listen_addresses",
"log_connections",
"log_disconnections",
"log_temp_files",
"log_error_verbosity",
"log_min_error_statement",
"maintenance_io_concurrency",
"max_connections",
"max_replication_flush_lag",
"max_replication_slots",
"max_replication_write_lag",
"max_wal_senders",
"max_wal_size",
"neon.unstable_extensions",
"neon.protocol_version",
"password_encryption",
"restart_after_crash",
"superuser_reserved_connections",
"synchronous_standby_names",
"wal_keep_size",
"wal_level",
"wal_log_hints",
"wal_sender_timeout"
]
},
"replica": {
"type": "object",
"properties": {
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled for replicas"
}
},
"required": ["hot_standby"]
},
"per_version": {
"type": "object",
"patternProperties": {
"^1[4-7]$": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"io_combine_limit": {
"type": "string",
"description": "IO combine limit"
}
}
},
"replica": {
"type": "object",
"properties": {
"recovery_prefetch": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to enable recovery prefetch for PostgreSQL replicas"
}
}
}
}
}
}
}
},
"required": ["common", "replica", "per_version"]
}
},
"required": ["pg_settings"]
}

View File

@@ -105,17 +105,17 @@ pg_settings:
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
16:
common:
common: {}
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
15:
common:
common: {}
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
14:
common:
replica:
common: {}
replica: {}

37
compute/package-lock.json generated Normal file
View File

@@ -0,0 +1,37 @@
{
"name": "neon-compute",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "neon-compute",
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
},
"node_modules/@sourcemeta/jsonschema": {
"version": "9.3.4",
"resolved": "https://registry.npmjs.org/@sourcemeta/jsonschema/-/jsonschema-9.3.4.tgz",
"integrity": "sha512-hkujfkZAIGXUs4U//We9faZW8LZ4/H9LqagRYsFSulH/VLcKPNhZyCTGg7AhORuzm27zqENvKpnX4g2FzudYFw==",
"cpu": [
"x64",
"arm64"
],
"license": "AGPL-3.0",
"os": [
"darwin",
"linux",
"win32"
],
"bin": {
"jsonschema": "cli.js"
},
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sourcemeta"
}
}
}
}

7
compute/package.json Normal file
View File

@@ -0,0 +1,7 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -354,11 +354,6 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
@@ -785,7 +780,7 @@ impl ComputeNode {
self.spawn_extension_stats_task();
if pspec.spec.autoprewarm {
self.prewarm_lfc();
self.prewarm_lfc(None);
}
Ok(())
}

View File

@@ -25,11 +25,16 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
endpoint_id: Option<String>,
) -> Result<Self> {
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
let Some(ref endpoint_id) = endpoint_id else {
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
@@ -84,7 +89,7 @@ impl ComputeNode {
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -97,7 +102,7 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
@@ -109,13 +114,14 @@ impl ComputeNode {
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
/// from_endpoint: None for endpoint managed by this compute_ctl
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
@@ -173,7 +179,7 @@ impl ComputeNode {
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();

View File

@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
#[derive(serde::Deserialize)]
pub struct PrewarmQuery {
pub from_endpoint: String,
}
pub(in crate::http) async fn prewarm(
compute: Compute,
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
) -> Response {
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(

View File

@@ -18,7 +18,7 @@ use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -605,6 +605,14 @@ struct EndpointCreateCmdArgs {
#[clap(long, help = "Postgres version")]
pg_version: u32,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
///
/// Specified on creation such that it's retained across reconfiguration and restarts.
///
/// NB: not yet supported by computes.
#[clap(long)]
grpc: bool,
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
@@ -1451,6 +1459,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.internal_http_port,
args.pg_version,
mode,
args.grpc,
!args.update_catalog,
false,
)?;
@@ -1491,13 +1500,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
)
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
@@ -1516,11 +1532,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
))
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
};
anyhow::Ok(pageserver)
}),
)
.await?;
@@ -1575,11 +1600,19 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
vec![(
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
vec![pageserver]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1588,11 +1621,21 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
})
.collect::<Vec<_>>()
};

View File

@@ -37,6 +37,7 @@
//! ```
//!
use std::collections::BTreeMap;
use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
@@ -45,6 +46,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
@@ -74,7 +77,6 @@ use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -87,6 +89,7 @@ pub struct EndpointConf {
external_http_port: u16,
internal_http_port: u16,
pg_version: u32,
grpc: bool,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
@@ -164,7 +167,7 @@ impl ComputeControlPlane {
public_key_use: Some(PublicKeyUse::Signature),
key_operations: Some(vec![KeyOperations::Verify]),
key_algorithm: Some(KeyAlgorithm::EdDSA),
key_id: Some(base64::encode_config(key_hash, base64::URL_SAFE_NO_PAD)),
key_id: Some(BASE64_URL_SAFE_NO_PAD.encode(key_hash)),
x509_url: None::<String>,
x509_chain: None::<Vec<String>>,
x509_sha1_fingerprint: None::<String>,
@@ -173,7 +176,7 @@ impl ComputeControlPlane {
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: base64::encode_config(public_key, base64::URL_SAFE_NO_PAD),
x: BASE64_URL_SAFE_NO_PAD.encode(public_key),
}),
}],
})
@@ -190,6 +193,7 @@ impl ComputeControlPlane {
internal_http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
@@ -224,6 +228,7 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
@@ -242,6 +247,7 @@ impl ComputeControlPlane {
internal_http_port,
pg_port,
pg_version,
grpc,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
@@ -296,6 +302,8 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mode: ComputeMode,
/// If true, the endpoint should use gRPC to communicate with Pageservers.
pub grpc: bool,
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
pub pg_address: SocketAddr,
@@ -331,7 +339,7 @@ pub enum EndpointStatus {
RunningNoPidfile,
}
impl std::fmt::Display for EndpointStatus {
impl Display for EndpointStatus {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
Self::Running => "running",
@@ -343,6 +351,29 @@ impl std::fmt::Display for EndpointStatus {
}
}
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
impl Endpoint {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
@@ -378,6 +409,7 @@ impl Endpoint {
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
grpc: conf.grpc,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
@@ -606,10 +638,10 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
pageservers
.iter()
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
@@ -654,7 +686,7 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
@@ -939,10 +971,12 @@ impl Endpoint {
pub async fn reconfigure(
&self,
mut pageservers: Vec<(Host, u16)>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -954,25 +988,7 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If we weren't given explicit pageservers, query the storage controller
if pageservers.is_empty() {
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>();
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -252,9 +253,10 @@ impl PageServerNode {
// the storage controller
let metadata_path = datadir.join("metadata.json");
let (_http_host, http_port) =
let http_host = "localhost".to_string();
let (_, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
let https_port = match self.conf.listen_https_addr.as_ref() {
Some(https_addr) => {
@@ -265,6 +267,13 @@ impl PageServerNode {
None => None,
};
let (mut grpc_host, mut grpc_port) = (None, None);
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
grpc_host = Some("localhost".to_string());
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
}
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -273,7 +282,9 @@ impl PageServerNode {
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: self.pg_connection_config.port(),
http_host: "localhost".to_string(),
grpc_host,
grpc_port,
http_host,
http_port,
https_port,
other: HashMap::from([(

View File

@@ -36,6 +36,10 @@ enum Command {
listen_pg_addr: String,
#[arg(long)]
listen_pg_port: u16,
#[arg(long)]
listen_grpc_addr: Option<String>,
#[arg(long)]
listen_grpc_port: Option<u16>,
#[arg(long)]
listen_http_addr: String,
@@ -418,6 +422,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -431,6 +437,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,

View File

@@ -12,6 +12,7 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap;
use std::fmt::Display;
use std::num::{NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::Duration;
@@ -24,16 +25,17 @@ use utils::logging::LogFormat;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// as a separate structure. This information is not needed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeMetadata {
#[serde(rename = "host")]
pub postgres_host: String,
#[serde(rename = "port")]
pub postgres_port: u16,
pub grpc_host: Option<String>,
pub grpc_port: Option<u16>,
pub http_host: String,
pub http_port: u16,
pub https_port: Option<u16>,
@@ -44,6 +46,23 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
impl Display for NodeMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"postgresql://{}:{} ",
self.postgres_host, self.postgres_port
)?;
if let Some(grpc_host) = &self.grpc_host {
let grpc_port = self.grpc_port.unwrap_or_default();
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
}
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
write!(f, "other:{:?}", self.other)?;
Ok(())
}
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
@@ -337,16 +356,21 @@ pub struct TimelineImportConfig {
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
/// Maximum total size of basebackup cache entries on disk in bytes.
/// The cache may slightly exceed this limit because we do not know
/// the exact size of the cache entry untill it's written to disk.
pub max_total_size_bytes: u64,
// TODO(diko): support max_entry_size_bytes.
// pub max_entry_size_bytes: u64,
pub max_size_entries: usize,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
max_size_entries: 1000,
}
}

View File

@@ -14,6 +14,8 @@ fn test_node_metadata_v1_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: None,
@@ -37,6 +39,35 @@ fn test_node_metadata_v2_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),
other: HashMap::new(),
}
)
}
#[test]
fn test_node_metadata_v3_backward_compatibilty() {
let v3 = serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": 23,
"grpc_host": "localhost",
"grpc_port": 51,
"http_host": "localhost",
"http_port": 42,
"https_port": 123,
}));
assert_eq!(
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: Some("localhost".to_string()),
grpc_port: Some(51),
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),

View File

@@ -52,6 +52,8 @@ pub struct NodeRegisterRequest {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -101,6 +103,8 @@ pub struct TenantLocateResponseShard {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -152,6 +156,8 @@ pub struct NodeDescribeResponse {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -9,7 +9,7 @@ use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::TenantShardId;
use crate::shard::{ShardStripeSize, TenantShardId};
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
@@ -36,6 +36,10 @@ pub struct ReAttachResponseTenant {
/// Default value only for backward compat: this field should be set
#[serde(default = "default_mode")]
pub mode: LocationConfigMode,
// Default value only for backward compat: this field should be set
#[serde(default = "ShardStripeSize::default")]
pub stripe_size: ShardStripeSize,
}
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponse {

View File

@@ -55,9 +55,16 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
}
tracing::info!("PostHog feature resolver stopped");
}

View File

@@ -39,6 +39,9 @@ pub struct LocalEvaluationResponse {
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
#[allow(dead_code)]
id: u64,
team_id: u64,
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
@@ -107,17 +110,32 @@ impl FeatureStore {
}
}
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
pub fn new_with_flags(
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<Self, &'static str> {
let mut store = Self::new();
store.set_flags(flags);
store
store.set_flags(flags, project_id)?;
Ok(store)
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
pub fn set_flags(
&mut self,
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<(), &'static str> {
self.flags.clear();
for flag in flags {
if let Some(project_id) = project_id {
if flag.team_id != project_id {
return Err(
"Retrieved a spec with different project id, wrong config? Discarding the feature flags.",
);
}
}
self.flags.insert(flag.key.clone(), flag);
}
Ok(())
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
@@ -534,6 +552,13 @@ impl PostHogClient {
})
}
/// Check if the server API key is a feature flag secure API key. This key can only be
/// used to fetch the feature flag specs and can only be used on a undocumented API
/// endpoint.
fn is_feature_flag_secure_api_key(&self) -> bool {
self.config.server_api_key.starts_with("phs_")
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
@@ -547,10 +572,22 @@ impl PostHogClient {
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
);
// OR
// BASE_URL/api/feature_flag/local_evaluation/
// with bearer token of feature flag specific self.server_api_key
let url = if self.is_feature_flag_secure_api_key() {
// The new feature local evaluation secure API token
format!(
"{}/api/feature_flag/local_evaluation",
self.config.private_api_url
)
} else {
// The old personal API token
format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
)
};
let response = self
.client
.get(url)
@@ -803,7 +840,7 @@ mod tests {
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant =
@@ -873,7 +910,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
@@ -929,7 +966,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
store.set_flags(response.flags, None).unwrap();
// This lacks the required properties and cannot be evaluated.
let variant =

View File

@@ -5,7 +5,7 @@ edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]
base64 = "0.20"
base64.workspace = true
byteorder.workspace = true
bytes.workspace = true
fallible-iterator.workspace = true

View File

@@ -3,6 +3,8 @@
use std::fmt::Write;
use std::{io, iter, mem, str};
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
@@ -226,7 +228,7 @@ impl ScramSha256 {
let (client_key, server_key) = match password {
Credentials::Password(password) => {
let salt = match base64::decode(parsed.salt) {
let salt = match BASE64_STANDARD.decode(parsed.salt) {
Ok(salt) => salt,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};
@@ -255,7 +257,7 @@ impl ScramSha256 {
let mut cbind_input = vec![];
cbind_input.extend(channel_binding.gs2_header().as_bytes());
cbind_input.extend(channel_binding.cbind_data());
let cbind_input = base64::encode(&cbind_input);
let cbind_input = BASE64_STANDARD.encode(&cbind_input);
self.message.clear();
write!(&mut self.message, "c={},r={}", cbind_input, parsed.nonce).unwrap();
@@ -272,7 +274,12 @@ impl ScramSha256 {
*proof ^= signature;
}
write!(&mut self.message, ",p={}", base64::encode(client_proof)).unwrap();
write!(
&mut self.message,
",p={}",
BASE64_STANDARD.encode(client_proof)
)
.unwrap();
self.state = State::Finish {
server_key,
@@ -306,7 +313,7 @@ impl ScramSha256 {
ServerFinalMessage::Verifier(verifier) => verifier,
};
let verifier = match base64::decode(verifier) {
let verifier = match BASE64_STANDARD.decode(verifier) {
Ok(verifier) => verifier,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};

View File

@@ -6,6 +6,8 @@
//! side. This is good because it ensures the cleartext password won't
//! end up in logs pg_stat displays, etc.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::RngCore;
use sha2::digest::FixedOutput;
@@ -83,8 +85,8 @@ pub(crate) async fn scram_sha_256_salt(
format!(
"SCRAM-SHA-256${}:{}${}:{}",
SCRAM_DEFAULT_ITERATIONS,
base64::encode(salt),
base64::encode(stored_key),
base64::encode(server_key)
BASE64_STANDARD.encode(salt),
BASE64_STANDARD.encode(stored_key),
BASE64_STANDARD.encode(server_key)
)
}

View File

@@ -824,6 +824,7 @@ impl RemoteStorage for AzureBlobStorage {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
.to_string()

View File

@@ -440,6 +440,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError>;
}
@@ -651,22 +652,23 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
match self {
Self::LocalFs(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::AwsS3(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::AzureBlob(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::Unreliable(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}

View File

@@ -610,6 +610,7 @@ impl RemoteStorage for LocalFs {
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}

View File

@@ -981,22 +981,16 @@ impl RemoteStorage for S3Bucket {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),

View File

@@ -240,11 +240,12 @@ impl RemoteStorage for UnreliableWrapper {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}

View File

@@ -157,7 +157,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel)
.time_travel_recover(None, t2, t_final, &cancel, None)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
@@ -173,7 +173,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel)
.time_travel_recover(None, t1, t_final, &cancel, None)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
@@ -189,7 +189,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel)
.time_travel_recover(None, t0, t_final, &cancel, None)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");

View File

@@ -20,7 +20,7 @@
//!
//! # local timeline dir
//! ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! grep "__" | cargo run --release --bin pagectl draw-timeline > out.svg
//!
//! # Layer map dump from `/v1/tenant/$TENANT/timeline/$TIMELINE/layer`
//! (jq -r '.historic_layers[] | .layer_file_name' | cargo run -p pagectl draw-timeline) < layer-map.json > out.svg
@@ -81,7 +81,11 @@ fn build_coordinate_compression_map<T: Ord + Copy>(coords: Vec<T>) -> BTreeMap<T
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
// Remove the temporary file extension, e.g., remove the `.d20a.___temp` part from the following filename:
// 000000067F000040490000404A00441B0000-000000067F000040490000404A00441B4000__000043483A34CE00.d20a.___temp
let lsns = split[1].split('.').collect::<Vec<&str>>()[0];
let mut lsns: Vec<&str> = lsns.split('-').collect();
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001

View File

@@ -13,7 +13,7 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
use crate::layer_map_analyzer::{LayerFile, parse_filename};
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
@@ -38,6 +38,8 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
/// Dump all information of a layer file locally
DumpLayerLocal { path: PathBuf },
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
@@ -131,15 +133,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
for (idx, layer_file) in to_print {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
print_layer_file(idx, &layer_file);
}
Ok(())
}
@@ -159,16 +153,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
let layer = layer?;
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
if *id == idx {
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
print_layer_file(idx, &layer_file);
if layer_file.is_delta {
read_delta_file(layer.path(), &ctx).await?;
@@ -183,6 +168,18 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
Ok(())
}
LayerCmd::DumpLayerLocal { path } => {
if let Ok(layer_file) = parse_filename(path.file_name().unwrap().to_str().unwrap()) {
print_layer_file(0, &layer_file);
if layer_file.is_delta {
read_delta_file(path, &ctx).await?;
} else {
read_image_file(path, &ctx).await?;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
@@ -247,3 +244,15 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
}
}
fn print_layer_file(idx: usize, layer_file: &LayerFile) {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}

View File

@@ -176,9 +176,11 @@ async fn main() -> anyhow::Result<()> {
let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
// Complexity limit: as we are running this command locally, we should have a lot of memory available, and we do not
// need to limit the number of versions we are going to delete.
storage
.unwrap()
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel)
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel, None)
.await?;
}
Commands::Key(dkc) => dkc.execute(),

View File

@@ -102,10 +102,10 @@ message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup at a given LSN.
// Requests a base backup.
message GetBaseBackupRequest {
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
uint64 lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}

View File

@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
use crate::proto;
/// A protocol error. Typically returned via try_from() or try_into().
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug)]
pub enum ProtocolError {
#[error("field '{0}' has invalid value '{1}'")]
Invalid(&'static str, String),
@@ -182,33 +182,28 @@ impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
}
}
/// Requests a base backup at a given LSN.
/// Requests a base backup.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
/// The LSN to fetch a base backup at.
pub read_lsn: ReadLsn,
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
pub lsn: Option<Lsn>,
/// If true, logical replication slots will not be created.
pub replica: bool,
}
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
fn from(pb: proto::GetBaseBackupRequest) -> Self {
Self {
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
replica: pb.replica,
})
}
}
}
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
fn from(request: GetBaseBackupRequest) -> Self {
Self {
read_lsn: Some(request.read_lsn.into()),
lsn: request.lsn.unwrap_or_default().0,
replica: request.replica,
}
}
@@ -422,6 +417,39 @@ impl From<GetPageResponse> for proto::GetPageResponse {
}
}
impl GetPageResponse {
/// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
/// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
/// GetPageResponse with a non-OK status code instead.
#[allow(clippy::result_large_err)]
pub fn try_from_status(
status: tonic::Status,
request_id: RequestID,
) -> Result<Self, tonic::Status> {
// We shouldn't see an OK status here, because we're emitting an error.
debug_assert_ne!(status.code(), tonic::Code::Ok);
if status.code() == tonic::Code::Ok {
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
// error and we should return a tonic::Status to terminate the stream.
let Ok(status_code) = status.code().try_into() else {
return Err(status);
};
// Return a GetPageResponse for the status.
Ok(Self {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
})
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
@@ -485,8 +513,42 @@ impl From<GetPageStatusCode> for i32 {
}
}
impl TryFrom<tonic::Code> for GetPageStatusCode {
type Error = tonic::Code;
fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
use tonic::Code;
let status_code = match code {
Code::Ok => Self::Ok,
// These are per-request errors, which should be returned as GetPageResponses.
Code::AlreadyExists => Self::InvalidRequest,
Code::DataLoss => Self::InternalError,
Code::FailedPrecondition => Self::InvalidRequest,
Code::InvalidArgument => Self::InvalidRequest,
Code::Internal => Self::InternalError,
Code::NotFound => Self::NotFound,
Code::OutOfRange => Self::InvalidRequest,
Code::ResourceExhausted => Self::SlowDown,
// These should terminate the stream by returning a tonic::Status.
Code::Aborted
| Code::Cancelled
| Code::DeadlineExceeded
| Code::PermissionDenied
| Code::Unauthenticated
| Code::Unavailable
| Code::Unimplemented
| Code::Unknown => return Err(code),
};
Ok(status_code)
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
@@ -530,6 +592,7 @@ impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
}
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetSlruSegmentRequest {
pub read_lsn: ReadLsn,
pub kind: SlruKind,

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
@@ -18,7 +19,10 @@ use utils::{
use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
metrics::{
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
BASEBACKUP_CACHE_SIZE,
},
task_mgr::TaskKind,
tenant::{
Timeline,
@@ -35,8 +39,13 @@ pub struct BasebackupPrepareRequest {
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
#[derive(Clone)]
struct CacheEntry {
/// LSN at which the basebackup was taken.
lsn: Lsn,
/// Size of the basebackup archive in bytes.
size_bytes: u64,
}
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
///
@@ -52,21 +61,12 @@ type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
remove_entry_sender: BasebackupRemoveEntrySender,
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
cancel: CancellationToken,
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
@@ -82,35 +82,32 @@ impl BasebackupCache {
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
let enabled = config.is_some();
let cache = Arc::new(BasebackupCache {
data_dir,
config: config.unwrap_or_default(),
tenant_manager,
remove_entry_sender,
entries: std::sync::Mutex::new(HashMap::new()),
cancel,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
});
if enabled {
runtime_handle.spawn(
cache
.clone()
.background(prepare_receiver, remove_entry_receiver),
);
if let Some(config) = config {
let background = BackgroundTask {
c: cache.clone(),
config,
tenant_manager,
cancel,
entry_count: 0,
total_size_bytes: 0,
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
};
runtime_handle.spawn(background.run(prepare_receiver));
}
cache
@@ -128,7 +125,7 @@ impl BasebackupCache {
) -> Option<tokio::fs::File> {
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
self.read_miss_count.inc();
return None;
}
@@ -166,6 +163,42 @@ impl BasebackupCache {
self.data_dir
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
}
/// The background task that does the job to prepare basebackups
/// and manage the cache entries on disk.
/// It is a separate struct from BasebackupCache to allow holding
/// a mutable reference to this state without a mutex lock,
/// while BasebackupCache is referenced by the clients.
struct BackgroundTask {
c: Arc<BasebackupCache>,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
/// Number of the entries in the cache.
/// This counter is used for metrics and applying cache limits.
/// It generally should be equal to c.entries.len(), but it's calculated
/// pessimistically for abnormal situations: if we encountered some errors
/// during removing the entry from disk, we won't decrement this counter to
/// make sure that we don't exceed the limit with "trashed" files on the disk.
/// It will also count files in the data_dir that are not valid cache entries.
entry_count: usize,
/// Total size of all the entries on the disk.
/// This counter is used for metrics and applying cache limits.
/// Similar to entry_count, it is calculated pessimistically for abnormal situations.
total_size_bytes: u64,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BackgroundTask {
fn tmp_dir(&self) -> Utf8PathBuf {
self.c.data_dir.join("tmp")
}
fn entry_tmp_path(
&self,
@@ -173,9 +206,8 @@ impl BasebackupCache {
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.data_dir
.join("tmp")
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
self.tmp_dir()
.join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
}
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
@@ -194,18 +226,21 @@ impl BasebackupCache {
Some((tenant_id, timeline_id, lsn))
}
async fn cleanup(&self) -> anyhow::Result<()> {
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
// Recreate the tmp directory to clear all files in it.
async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
let tmp_dir = self.tmp_dir();
if tmp_dir.exists() {
tokio::fs::remove_dir_all(&tmp_dir).await?;
}
tokio::fs::create_dir_all(&tmp_dir).await?;
Ok(())
}
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
async fn cleanup(&mut self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Leave only up-to-date entries.
let entries_old = self.c.entries.lock().unwrap().clone();
let mut entries_new = HashMap::new();
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
if !tenant_shard_id.is_shard_zero() {
@@ -218,43 +253,42 @@ impl BasebackupCache {
for timeline in tenant.list_timelines() {
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
if let Some(&entry_lsn) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry_lsn {
entries_new.insert(tti, entry_lsn);
if let Some(entry) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry.lsn {
entries_new.insert(tti, entry.clone());
}
}
}
}
for (&tti, &lsn) in entries_old.iter() {
// Try to remove all entries that are not up-to-date.
for (&tti, entry) in entries_old.iter() {
if !entries_new.contains_key(&tti) {
self.remove_entry_sender
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
.unwrap();
self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
.await;
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
*self.entries.lock().unwrap() = entries_new;
// Note: BackgroundTask is the only writer for self.c.entries,
// so it couldn't have been modified concurrently.
*self.c.entries.lock().unwrap() = entries_new;
Ok(())
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
async fn on_startup(&mut self) -> anyhow::Result<()> {
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.c.data_dir)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
.context("Failed to create basebackup cache data directory")?;
self.clean_tmp_dir()
.await
.context("Failed to clean tmp directory")?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
while let Some(dir_entry) = dir.next_entry().await? {
let filename = dir_entry.file_name();
@@ -263,33 +297,43 @@ impl BasebackupCache {
continue;
}
let size_bytes = dir_entry
.metadata()
.await
.map_err(|e| {
anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
})?
.len();
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
let Some((tenant_id, timeline_id, lsn)) = parsed else {
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
continue;
};
let cur_entry = CacheEntry { lsn, size_bytes };
let tti = TenantTimelineId::new(tenant_id, timeline_id);
use std::collections::hash_map::Entry::*;
match entries.entry(tti) {
Occupied(mut entry) => {
let entry_lsn = *entry.get();
let found_entry = entry.get();
// Leave only the latest entry, remove the old one.
if lsn < entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
lsn,
))?;
} else if lsn > entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
entry_lsn,
))?;
entry.insert(lsn);
if cur_entry.lsn < found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
.await;
} else if cur_entry.lsn > found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, found_entry)
.await;
entry.insert(cur_entry);
} else {
// Two different filenames parsed to the same timline_id and LSN.
// Should never happen.
@@ -300,22 +344,17 @@ impl BasebackupCache {
}
}
Vacant(entry) => {
entry.insert(lsn);
entry.insert(cur_entry);
}
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
*self.entries.lock().unwrap() = entries;
*self.c.entries.lock().unwrap() = entries;
Ok(())
}
async fn background(
self: Arc<Self>,
mut prepare_receiver: BasebackupPrepareReceiver,
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
) {
async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
// Panic in the background is a safe fallback.
// It will drop receivers and the cache will be effectively disabled.
self.on_startup()
@@ -338,11 +377,6 @@ impl BasebackupCache {
continue;
}
}
Some(req) = remove_entry_receiver.recv() => {
if let Err(e) = tokio::fs::remove_file(req).await {
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
}
}
_ = cleanup_ticker.tick() => {
self.cleanup().await.unwrap_or_else(|e| {
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
@@ -356,6 +390,67 @@ impl BasebackupCache {
}
}
/// Try to remove an entry from disk.
/// The caller is responsible for removing the entry from the in-memory state.
/// Updates size counters and corresponding metrics.
/// Ignores the filesystem errors as not-so-important, but the size counters
/// are not decremented in this case, so the file will continue to be counted
/// towards the size limits.
async fn try_remove_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: &CacheEntry,
) {
let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
match tokio::fs::remove_file(&entry_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::warn!(
"Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
tenant_id,
timeline_id,
entry.lsn,
e
);
return;
}
}
self.entry_count -= 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes -= entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
}
/// Insert the cache entry into in-memory state and update the size counters.
/// Assumes that the file for the entry already exists on disk.
/// If the entry already exists with previous LSN, it will be removed.
async fn upsert_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: CacheEntry,
) {
let tti = TenantTimelineId::new(tenant_id, timeline_id);
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
if let Some(old_entry) = old_entry {
self.try_remove_entry(tenant_id, timeline_id, &old_entry)
.await;
}
}
/// Prepare a basebackup for the given timeline.
///
/// If the basebackup already exists with a higher LSN or the timeline already
@@ -364,7 +459,7 @@ impl BasebackupCache {
/// The basebackup is prepared in a temporary directory and then moved to the final
/// location to make the operation atomic.
async fn prepare_basebackup(
&self,
&mut self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
req_lsn: Lsn,
@@ -378,30 +473,44 @@ impl BasebackupCache {
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
// TODO(diko): I don't think we will hit the limit,
// but if we do, it makes sense to try to evict oldest entries. here
if self.entry_count >= self.config.max_size_entries {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_size_entries), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
if self.total_size_bytes >= self.config.max_total_size_bytes {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_total_size_bytes), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
{
let entries = self.entries.lock().unwrap();
if let Some(&entry_lsn) = entries.get(&tti) {
if entry_lsn >= req_lsn {
let entries = self.c.entries.lock().unwrap();
if let Some(entry) = entries.get(&tti) {
if entry.lsn >= req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%entry_lsn,
%entry.lsn,
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
if entries.len() as i64 >= self.config.max_size_entries {
tracing::info!(
%timeline_id,
%req_lsn,
"Basebackup cache is full, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
let tenant = self
@@ -437,47 +546,52 @@ impl BasebackupCache {
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
.await;
if let Err(err) = res {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
let entry = match res {
Ok(entry) => entry,
Err(err) => {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
}
return Err(err);
}
return Err(err);
}
};
// Move the tmp file to the final location atomically.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
// in the main directory.
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self
.c
.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
let mut entries = self.entries.lock().unwrap();
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
// Remove the old entry if it exists.
self.remove_entry_sender
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
.unwrap();
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
.await;
self.prepare_ok_count.inc();
Ok(())
}
/// Prepares a basebackup in a temporary file.
/// Guarantees that the tmp file is fsynced before returning.
async fn prepare_basebackup_tmp(
&self,
emptry_tmp_path: &Utf8Path,
entry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
) -> anyhow::Result<CacheEntry> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let file = tokio::fs::File::create(entry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(
@@ -513,6 +627,12 @@ impl BasebackupCache {
writer.flush().await?;
writer.into_inner().sync_all().await?;
Ok(())
// TODO(diko): we can count it via Writer wrapper instead of a syscall.
let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
Ok(CacheEntry {
lsn: req_lsn,
size_bytes,
})
}
}

View File

@@ -159,14 +159,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);
tracing::info!("Loaded node metadata: {m}");
let az_id = {
let az_id_from_metadata = m
@@ -195,6 +188,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_grpc_addr: m.grpc_host,
listen_grpc_port: m.grpc_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: m.https_port,

View File

@@ -73,6 +73,7 @@ use crate::tenant::remote_timeline_client::{
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
@@ -1451,7 +1452,10 @@ async fn timeline_layer_scan_disposable_keys(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
.with_scope_timeline(&timeline);
let guard = timeline.layers.read().await;
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),

View File

@@ -4428,18 +4428,16 @@ pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_basebackup_cache_entries_total",
"Number of entries in the basebackup cache"
)
.expect("failed to define a metric")
});
// FIXME: Support basebackup cache size metrics.
#[allow(dead_code)]
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_basebackup_cache_size_bytes",
"Total size of all basebackup cache entries on disk in bytes"
)

View File

@@ -14,7 +14,7 @@ use std::{io, str};
use anyhow::{Context as _, anyhow, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::{Buf, BytesMut};
use bytes::{Buf as _, BufMut as _, BytesMut};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
@@ -623,60 +623,6 @@ enum PageStreamError {
BadRequest(Cow<'static, str>),
}
impl PageStreamError {
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
/// convenience method for use from a get_pages gRPC stream.
#[allow(clippy::result_large_err)]
fn into_get_page_response(
self,
request_id: page_api::RequestID,
) -> Result<proto::GetPageResponse, tonic::Status> {
use page_api::GetPageStatusCode;
use tonic::Code;
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
let status: tonic::Status = self.into();
let status_code = match status.code() {
// We shouldn't see an OK status here, because we're emitting an error.
Code::Ok => {
debug_assert_ne!(status.code(), Code::Ok);
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// These are per-request errors, returned as GetPageResponses.
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
Code::DataLoss => GetPageStatusCode::InternalError,
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
Code::Internal => GetPageStatusCode::InternalError,
Code::NotFound => GetPageStatusCode::NotFound,
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
// These should terminate the stream.
Code::Aborted => return Err(status),
Code::Cancelled => return Err(status),
Code::DeadlineExceeded => return Err(status),
Code::PermissionDenied => return Err(status),
Code::Unauthenticated => return Err(status),
Code::Unavailable => return Err(status),
Code::Unimplemented => return Err(status),
Code::Unknown => return Err(status),
};
Ok(page_api::GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
}
.into())
}
}
impl From<PageStreamError> for tonic::Status {
fn from(err: PageStreamError) -> Self {
use tonic::Code;
@@ -3438,8 +3384,8 @@ impl GrpcPageServiceHandler {
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
///
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
/// with an appropriate status code instead.
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
@@ -3456,7 +3402,7 @@ impl GrpcPageServiceHandler {
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
let req: page_api::GetPageRequest = req.try_into()?;
let req = page_api::GetPageRequest::try_from(req)?;
span_record!(
req_id = %req.request_id,
@@ -3467,7 +3413,7 @@ impl GrpcPageServiceHandler {
);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
let effective_lsn = match PageServerHandler::effective_request_lsn(
let effective_lsn = PageServerHandler::effective_request_lsn(
&timeline,
timeline.get_last_record_lsn(),
req.read_lsn.request_lsn,
@@ -3475,10 +3421,7 @@ impl GrpcPageServiceHandler {
.not_modified_since_lsn
.unwrap_or(req.read_lsn.request_lsn),
&latest_gc_cutoff_lsn,
) {
Ok(lsn) => lsn,
Err(err) => return err.into_get_page_response(req.request_id),
};
)?;
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
@@ -3535,7 +3478,7 @@ impl GrpcPageServiceHandler {
"unexpected response: {resp:?}"
)));
}
Err(err) => return err.err.into_get_page_response(req.request_id),
Err(err) => return Err(err.err.into()),
};
}
@@ -3601,42 +3544,44 @@ impl proto::PageService for GrpcPageServiceHandler {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate the request, decorate the span, and wait for the LSN to arrive.
//
// TODO: this requires a read LSN, is that ok?
// Validate the request and decorate the span.
Self::ensure_shard_zero(&timeline)?;
if timeline.is_archived() == Some(true) {
return Err(tonic::Status::failed_precondition("timeline is archived"));
}
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
span_record!(lsn=%req.read_lsn);
span_record!(lsn=?req.lsn);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
req.read_lsn.request_lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
// Wait for the LSN to arrive, if given.
if let Some(lsn) = req.lsn {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
}
// Spawn a task to run the basebackup.
//
// TODO: do we need to support full base backups, for debugging?
// TODO: do we need to support full base backups, for debugging? This also requires passing
// the prev_lsn parameter.
let span = Span::current();
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let result = basebackup::send_basebackup_tarball(
&mut simplex_write,
&timeline,
Some(req.read_lsn.request_lsn),
req.lsn,
None,
false,
req.replica,
@@ -3652,20 +3597,21 @@ impl proto::PageService for GrpcPageServiceHandler {
// Emit chunks of size CHUNK_SIZE.
let chunks = async_stream::try_stream! {
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
loop {
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
// If we read 0 bytes, either the chunk is full or the stream is closed.
if n == 0 {
if chunk.is_empty() {
break;
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
loop {
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
if n == 0 {
break; // full chunk or closed stream
}
yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
chunk.clear();
}
let chunk = chunk.into_inner().freeze();
if chunk.is_empty() {
break;
}
yield proto::GetBaseBackupResponseChunk::from(chunk);
}
// Wait for the basebackup task to exit and check for errors.
jh.await.map_err(|err| {
@@ -3742,9 +3688,16 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
let req_id = req.request_id;
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await?
.await;
yield match result {
Ok(resp) => resp,
// Convert per-request errors to GetPageResponses as appropriate, or terminate
// the stream with a tonic::Status.
Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(),
}
}
};

View File

@@ -51,6 +51,7 @@ use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::layer_manager::LayerManagerLockHolder;
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata,
@@ -1315,7 +1316,7 @@ impl TenantShard {
ancestor.is_some()
|| timeline
.layers
.read()
.read(LayerManagerLockHolder::LoadLayerMap)
.await
.layer_map()
.expect(
@@ -2643,7 +2644,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read()
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()
.unwrap()
@@ -3158,7 +3159,12 @@ impl TenantShard {
for timeline in &compact {
// Collect L0 counts. Can't await while holding lock above.
if let Ok(lm) = timeline.layers.read().await.layer_map() {
if let Ok(lm) = timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await
.layer_map()
{
l0_counts.insert(timeline.timeline_id, lm.level0_deltas().len());
}
}
@@ -4900,7 +4906,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read()
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()
.unwrap()
@@ -6970,7 +6976,7 @@ mod tests {
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
let layer_map = tline.layers.read().await;
let layer_map = tline.layers.read(LayerManagerLockHolder::Testing).await;
let level0_deltas = layer_map
.layer_map()?
.level0_deltas()
@@ -7206,7 +7212,7 @@ mod tests {
let lsn = Lsn(0x10);
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
let guard = tline.layers.read(LayerManagerLockHolder::Testing).await;
let lm = guard.layer_map()?;
lm.dump(true, &ctx).await?;
@@ -8234,12 +8240,23 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
let before_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
let after_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
assert!(
after_num_l0_delta_files < before_num_l0_delta_files,

View File

@@ -61,8 +61,8 @@ pub(crate) struct LocationConf {
/// The detailed shard identity. This structure is already scoped within
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
/// key->shard mappings.
// TODO(vlad): Remove this default once all configs have a shard identity on disk.
#[serde(default = "ShardIdentity::unsharded")]
#[serde(skip_serializing_if = "ShardIdentity::is_unsharded")]
pub(crate) shard: ShardIdentity,
/// The pan-cluster tenant configuration, the same on all locations
@@ -149,7 +149,12 @@ impl LocationConf {
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(&mut self, mode: AttachmentMode, generation: Generation) {
pub(crate) fn attach_in_generation(
&mut self,
mode: AttachmentMode,
generation: Generation,
stripe_size: ShardStripeSize,
) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
@@ -163,6 +168,8 @@ impl LocationConf {
})
}
}
self.shard.stripe_size = stripe_size;
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {

View File

@@ -51,6 +51,7 @@ use crate::tenant::config::{
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{
AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
};
@@ -128,7 +129,7 @@ pub(crate) enum ShardSelector {
///
/// This represents the subset of a LocationConfig that we receive during re-attach.
pub(crate) enum TenantStartupMode {
Attached((AttachmentMode, Generation)),
Attached((AttachmentMode, Generation, ShardStripeSize)),
Secondary,
}
@@ -142,15 +143,21 @@ impl TenantStartupMode {
match (rart.mode, rart.r#gen) {
(LocationConfigMode::Detached, _) => None,
(LocationConfigMode::Secondary, _) => Some(Self::Secondary),
(LocationConfigMode::AttachedMulti, Some(g)) => {
Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
}
(LocationConfigMode::AttachedSingle, Some(g)) => {
Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
}
(LocationConfigMode::AttachedStale, Some(g)) => {
Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
}
(LocationConfigMode::AttachedMulti, Some(g)) => Some(Self::Attached((
AttachmentMode::Multi,
Generation::new(g),
rart.stripe_size,
))),
(LocationConfigMode::AttachedSingle, Some(g)) => Some(Self::Attached((
AttachmentMode::Single,
Generation::new(g),
rart.stripe_size,
))),
(LocationConfigMode::AttachedStale, Some(g)) => Some(Self::Attached((
AttachmentMode::Stale,
Generation::new(g),
rart.stripe_size,
))),
_ => {
tracing::warn!(
"Received invalid re-attach state for tenant {}: {rart:?}",
@@ -318,9 +325,11 @@ fn emergency_generations(
Some((
*tid,
match &lc.mode {
LocationMode::Attached(alc) => {
TenantStartupMode::Attached((alc.attach_mode, alc.generation))
}
LocationMode::Attached(alc) => TenantStartupMode::Attached((
alc.attach_mode,
alc.generation,
ShardStripeSize::default(),
)),
LocationMode::Secondary(_) => TenantStartupMode::Secondary,
},
))
@@ -364,7 +373,7 @@ async fn init_load_generations(
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Attached((_mode, generation, _stripe_size)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen_| (*id, *gen_))
@@ -584,7 +593,7 @@ pub async fn init_tenant_mgr(
location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
}
}
Some(TenantStartupMode::Attached((attach_mode, generation))) => {
Some(TenantStartupMode::Attached((attach_mode, generation, stripe_size))) => {
let old_gen_higher = match &location_conf.mode {
LocationMode::Attached(AttachedLocationConfig {
generation: old_generation,
@@ -608,7 +617,7 @@ pub async fn init_tenant_mgr(
// local disk content: demote to secondary rather than detaching.
location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
} else {
location_conf.attach_in_generation(*attach_mode, *generation);
location_conf.attach_in_generation(*attach_mode, *generation, *stripe_size);
}
}
}
@@ -1658,7 +1667,10 @@ impl TenantManager {
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let layers = timeline.layers.read().await;
let layers = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
for layer in layers.likely_resident_layers() {
let relative_path = layer

View File

@@ -1,6 +1,7 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use std::io::{ErrorKind, SeekFrom};
use std::num::NonZeroU32;
use std::time::SystemTime;
use anyhow::{Context, bail};
@@ -228,11 +229,25 @@ pub(crate) async fn time_travel_recover_tenant(
let timelines_path = super::remote_timelines_path(tenant_shard_id);
prefixes.push(timelines_path);
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
for prefix in &prefixes {
backoff::retry(
|| async {
storage
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
.time_travel_recover(
Some(prefix),
timestamp,
done_if_after,
cancel,
COMPLEXITY_LIMIT,
)
.await
},
|e| !matches!(e, TimeTravelError::Other(_)),

View File

@@ -1635,6 +1635,7 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2002,7 +2003,7 @@ pub(crate) mod test {
let initdb_layer = timeline
.layers
.read()
.read(crate::tenant::timeline::layer_manager::LayerManagerLockHolder::Testing)
.await
.likely_resident_layers()
.next()
@@ -2078,7 +2079,7 @@ pub(crate) mod test {
let new_layer = timeline
.layers
.read()
.read(LayerManagerLockHolder::Testing)
.await
.likely_resident_layers()
.find(|&x| x != &initdb_layer)

View File

@@ -10,6 +10,7 @@ use super::*;
use crate::context::DownloadBehavior;
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
/// Used in tests to advance a future to wanted await point, and not futher.
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
@@ -59,7 +60,7 @@ async fn smoke_test() {
// there to avoid the timeline being illegally empty
let (layer, dummy_layer) = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -215,7 +216,7 @@ async fn smoke_test() {
// Simulate GC removing our test layer.
{
let mut g = timeline.layers.write().await;
let mut g = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let layers = &[layer];
g.open_mut().unwrap().finish_gc_timeline(layers);
@@ -261,7 +262,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -305,7 +306,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// assert that once we remove the `layer` from the layer map and drop our reference,
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write().await;
let mut layers = timeline.layers.write(LayerManagerLockHolder::Testing).await;
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}
@@ -347,7 +348,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -480,7 +481,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -655,7 +656,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -741,7 +742,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -862,7 +863,7 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let mut guard = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.open_mut().unwrap().finish_gc_timeline(&layers);

View File

@@ -35,7 +35,11 @@ use fail::fail_point;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use handle::ShardTimelineId;
use layer_manager::Shutdown;
use layer_manager::{
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
Shutdown,
};
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
@@ -82,7 +86,6 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use self::delete::DeleteTimelineFlow;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
@@ -181,13 +184,13 @@ impl std::fmt::Display for ImageLayerCreationMode {
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
fn drop_layer_manager_rlock(rlock: LayerManagerReadGuard<'_>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
fn drop_layer_manager_wlock(rlock: LayerManagerWriteGuard<'_>) {
drop(rlock)
}
@@ -241,7 +244,7 @@ pub struct Timeline {
///
/// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: tokio::sync::RwLock<LayerManager>,
pub(crate) layers: LockedLayerManager,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -1535,7 +1538,10 @@ impl Timeline {
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
guard.layer_size_sum()
}
@@ -1845,7 +1851,7 @@ impl Timeline {
// time, and this was missed.
// if write_guard.is_none() { return; }
let Ok(layers_guard) = self.layers.try_read() else {
let Ok(layers_guard) = self.layers.try_read(LayerManagerLockHolder::TryFreezeLayer) else {
// Don't block if the layer lock is busy
return;
};
@@ -2158,7 +2164,7 @@ impl Timeline {
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
.read()
.read(LayerManagerLockHolder::Shutdown)
.await
.layer_map()
.map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len()))
@@ -2262,7 +2268,10 @@ impl Timeline {
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
let mut write_guard = self.write_lock.lock().await;
self.layers.write().await.shutdown(&mut write_guard);
self.layers
.write(LayerManagerLockHolder::Shutdown)
.await
.shutdown(&mut write_guard);
}
// Finally wait until any gate-holders are complete.
@@ -2365,7 +2374,10 @@ impl Timeline {
&self,
reset: LayerAccessStatsReset,
) -> Result<LayerMapInfo, layer_manager::Shutdown> {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_map = guard.layer_map()?;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
@@ -3232,7 +3244,7 @@ impl Timeline {
/// Initialize with an empty layer map. Used when creating a new timeline.
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
let mut layers = self.layers.try_write().expect(
let mut layers = self.layers.try_write(LayerManagerLockHolder::Init).expect(
"in the context where we call this function, no other task has access to the object",
);
layers
@@ -3252,7 +3264,10 @@ impl Timeline {
use init::Decision::*;
use init::{Discovered, DismissedLayer};
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::LoadLayerMap)
.await;
let timer = self.metrics.load_layer_map_histo.start_timer();
@@ -3869,7 +3884,10 @@ impl Timeline {
&self,
layer_name: &LayerName,
) -> Result<Option<Layer>, layer_manager::Shutdown> {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer = guard
.layer_map()?
.iter_historic_layers()
@@ -3902,7 +3920,10 @@ impl Timeline {
return None;
}
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
// Firstly, if there's any heatmap left over from when this location
// was a secondary, take that into account. Keep layers that are:
@@ -4000,7 +4021,10 @@ impl Timeline {
}
pub(super) async fn generate_unarchival_heatmap(&self, end_lsn: Lsn) -> PreviousHeatmap {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
let now = SystemTime::now();
let mut heatmap_layers = Vec::default();
@@ -4342,7 +4366,7 @@ impl Timeline {
query: &VersionedKeySpaceQuery,
) -> Result<LayerFringe, GetVectoredError> {
let mut fringe = LayerFringe::new();
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::GetPage).await;
match query {
VersionedKeySpaceQuery::Uniform { keyspace, lsn } => {
@@ -4445,7 +4469,7 @@ impl Timeline {
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let guard = timeline.layers.read(LayerManagerLockHolder::GetPage).await;
guard.update_search_fringe(&unmapped_keyspace, cont_lsn, &mut fringe)?;
// It's safe to drop the layer map lock after planning the next round of reads.
@@ -4555,7 +4579,10 @@ impl Timeline {
_guard: &tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::GetLayerForWrite)
.await;
let last_record_lsn = self.get_last_record_lsn();
ensure!(
@@ -4597,7 +4624,10 @@ impl Timeline {
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) -> Result<u64, FlushLayerError> {
let frozen = {
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::TryFreezeLayer)
.await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock, &self.metrics)
@@ -4638,7 +4668,12 @@ impl Timeline {
ctx: &RequestContext,
) {
// Subscribe to L0 delta layer updates, for compaction backpressure.
let mut watch_l0 = match self.layers.read().await.layer_map() {
let mut watch_l0 = match self
.layers
.read(LayerManagerLockHolder::FlushLoop)
.await
.layer_map()
{
Ok(lm) => lm.watch_level0_deltas(),
Err(Shutdown) => return,
};
@@ -4675,7 +4710,7 @@ impl Timeline {
// Fetch the next layer to flush, if any.
let (layer, l0_count, frozen_count, frozen_size) = {
let layers = self.layers.read().await;
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
@@ -4971,7 +5006,10 @@ impl Timeline {
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::FlushFrozenLayer)
.await;
guard.open_mut()?.finish_flush_l0_layer(
delta_layer_to_add.as_ref(),
@@ -5186,7 +5224,7 @@ impl Timeline {
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let Ok(layers) = guard.layer_map() else {
return false;
};
@@ -5604,7 +5642,7 @@ impl Timeline {
if let ImageLayerCreationMode::Force = mode {
// When forced to create image layers, we might try and create them where they already
// exist. This mode is only used in tests/debug.
let layers = self.layers.read().await;
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
if layers.contains_key(&PersistentLayerKey {
key_range: img_range.clone(),
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
@@ -5729,7 +5767,7 @@ impl Timeline {
let image_layers = batch_image_writer.finish(self, ctx).await?;
let mut guard = self.layers.write().await;
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
// now they are being scheduled outside of write lock; current way is inconsistent with
@@ -5737,7 +5775,7 @@ impl Timeline {
guard
.open_mut()?
.track_new_image_layers(&image_layers, &self.metrics);
drop_wlock(guard);
drop_layer_manager_wlock(guard);
let duration = timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
@@ -6107,7 +6145,7 @@ impl Timeline {
layers_to_remove: &[Layer],
) -> Result<(), CompactionError> {
let mut guard = tokio::select! {
guard = self.layers.write() => guard,
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::ShuttingDown);
}
@@ -6156,7 +6194,7 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
drop_wlock(guard);
drop_layer_manager_wlock(guard);
Ok(())
}
@@ -6166,7 +6204,7 @@ impl Timeline {
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
// to avoid double-removing, and avoid rewriting something that was removed.
@@ -6517,7 +6555,10 @@ impl Timeline {
// 5. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
let layers = guard.layer_map()?;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -6819,7 +6860,10 @@ impl Timeline {
use pageserver_api::models::DownloadRemoteLayersTaskState;
let remaining = {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let Ok(lm) = guard.layer_map() else {
// technically here we could look into iterating accessible layers, but downloading
// all layers of a shutdown timeline makes no sense regardless.
@@ -6925,7 +6969,7 @@ impl Timeline {
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
let mut max_layer_size: Option<u64> = None;
let resident_layers = guard
@@ -7026,7 +7070,7 @@ impl Timeline {
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write().await;
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
guard
.open_mut()
.unwrap()
@@ -7089,7 +7133,7 @@ impl Timeline {
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write().await;
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
guard
.open_mut()
.unwrap()
@@ -7184,7 +7228,7 @@ impl Timeline {
// Link the layer to the layer map
{
let mut guard = self.layers.write().await;
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let layer_map = guard.open_mut().unwrap();
layer_map.force_insert_in_memory_layer(Arc::new(layer));
}
@@ -7201,7 +7245,7 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
@@ -7230,7 +7274,7 @@ impl Timeline {
self: &Arc<Timeline>,
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
for layer in guard.layer_map()?.iter_historic_layers() {
layers.push(layer.key());
}
@@ -7342,7 +7386,7 @@ impl TimelineWriter<'_> {
let l0_count = self
.tl
.layers
.read()
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await
.layer_map()?
.level0_deltas()
@@ -7561,6 +7605,7 @@ mod tests {
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::layer_map::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerName, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::{DeltaLayerTestDesc, EvictionError};
use crate::tenant::{PreviousHeatmap, Timeline};
@@ -7668,7 +7713,7 @@ mod tests {
// Evict all the layers and stash the old heatmap in the timeline.
// This simulates a migration to a cold secondary location.
let guard = timeline.layers.read().await;
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let mut all_layers = Vec::new();
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
@@ -7790,7 +7835,7 @@ mod tests {
})));
// Evict all the layers in the previous heatmap
let guard = timeline.layers.read().await;
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
layer.evict_and_wait(forever).await.unwrap();
@@ -7853,7 +7898,10 @@ mod tests {
}
async fn find_some_layer(timeline: &Timeline) -> Layer {
let layers = timeline.layers.read().await;
let layers = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let desc = layers
.layer_map()
.unwrap()

View File

@@ -4,6 +4,7 @@ use std::ops::Range;
use utils::lsn::Lsn;
use super::Timeline;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
#[derive(serde::Serialize)]
pub(crate) struct RangeAnalysis {
@@ -24,7 +25,10 @@ impl Timeline {
let num_of_l0;
let all_layer_files = {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
num_of_l0 = guard.layer_map().unwrap().level0_deltas().len();
guard.all_persistent_layers()
};

View File

@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::layer_manager::LayerManager;
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
@@ -62,7 +62,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::tasks::log_compaction_error;
use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_rlock,
ResidentLayer, drop_layer_manager_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -314,7 +314,10 @@ impl GcCompactionQueue {
.unwrap_or(Lsn::INVALID);
let layers = {
let guard = timeline.layers.read().await;
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -408,7 +411,10 @@ impl GcCompactionQueue {
timeline: &Arc<Timeline>,
lsn: Lsn,
) -> Result<u64, CompactionError> {
let guard = timeline.layers.read().await;
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_map = guard.layer_map()?;
let layers = layer_map.iter_historic_layers().collect_vec();
let mut size = 0;
@@ -851,7 +857,7 @@ impl KeyHistoryRetention {
}
let layer_generation;
{
let guard = tline.layers.read().await;
let guard = tline.layers.read(LayerManagerLockHolder::Compaction).await;
if !guard.contains_key(key) {
return false;
}
@@ -1282,7 +1288,10 @@ impl Timeline {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
guard
.all_persistent_layers()
.iter()
@@ -1461,7 +1470,7 @@ impl Timeline {
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read().await;
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers_iter = layers.layer_map()?.iter_historic_layers();
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
for layer_desc in layers_iter {
@@ -1722,7 +1731,10 @@ impl Timeline {
// are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self.layers.read().await;
let layer_manager = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_map = layer_manager.layer_map()?;
let readable_points = {
@@ -1775,7 +1787,7 @@ impl Timeline {
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = self.layers.read().await;
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
@@ -1803,7 +1815,7 @@ impl Timeline {
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1<'a>(
self: &'a Arc<Self>,
guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
guard: LayerManagerReadGuard<'a>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
@@ -2029,7 +2041,7 @@ impl Timeline {
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
drop_layer_manager_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
@@ -2469,7 +2481,7 @@ impl Timeline {
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = guard.layer_map()?;
let l0_deltas = layers.level0_deltas();
@@ -3008,7 +3020,7 @@ impl Timeline {
}
split_key_ranges.sort();
let all_layers = {
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -3112,12 +3124,12 @@ impl Timeline {
.await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
let sub_compaction_progress = format!("{}/{}", idx + 1, jobs_len);
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.instrument(info_span!(
"sub_compaction",
sub_compaction_progress = sub_compaction_progress
))
.await?;
}
if jobs_len == 0 {
@@ -3185,7 +3197,10 @@ impl Timeline {
// 1. If a layer is in the selection, all layers below it are in the selection.
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let job_desc = {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
@@ -3956,7 +3971,10 @@ impl Timeline {
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self.layers.read().await;
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -4020,7 +4038,10 @@ impl Timeline {
let update_guard = self.gc_compaction_layer_update_lock.write().await;
// Acquiring the update guard ensures current read operations end and new read operations are blocked.
// TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
let mut guard = self.layers.write().await;
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
@@ -4088,7 +4109,11 @@ impl TimelineAdaptor {
pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
let layers_to_delete = {
let guard = self.timeline.layers.read().await;
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
self.layers_to_delete
.iter()
.map(|x| guard.get_from_desc(x))
@@ -4133,7 +4158,11 @@ impl CompactionJobExecutor for TimelineAdaptor {
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
let guard = self.timeline.layers.read().await;
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let layer_map = guard.layer_map()?;
let result = layer_map
@@ -4172,7 +4201,11 @@ impl CompactionJobExecutor for TimelineAdaptor {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
let l = {
let guard = self.timeline.layers.read().await;
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident(ctx).await?;

View File

@@ -19,7 +19,7 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use super::layer_manager::LayerManager;
use super::layer_manager::{LayerManager, LayerManagerLockHolder};
use super::{FlushLayerError, Timeline};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
@@ -199,7 +199,10 @@ pub(crate) async fn generate_tombstone_image_layer(
let image_lsn = ancestor_lsn;
{
let layers = detached.layers.read().await;
let layers = detached
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await;
for layer in layers.all_persistent_layers() {
if !layer.is_delta
&& layer.lsn_range.start == image_lsn
@@ -423,7 +426,7 @@ pub(super) async fn prepare(
// we do not need to start from our layers, because they can only be layers that come
// *after* ancestor_lsn
let layers = tokio::select! {
guard = ancestor.layers.read() => guard,
guard = ancestor.layers.read(LayerManagerLockHolder::DetachAncestor) => guard,
_ = detached.cancel.cancelled() => {
return Err(ShuttingDown);
}
@@ -869,7 +872,12 @@ async fn remote_copy(
// Double check that the file is orphan (probably from an earlier attempt), then delete it
let key = file_name.clone().into();
if adoptee.layers.read().await.contains_key(&key) {
if adoptee
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await
.contains_key(&key)
{
// We are supposed to filter out such cases before coming to this function
return Err(Error::Prepare(anyhow::anyhow!(
"layer file {file_name} already present and inside layer map"

View File

@@ -33,6 +33,7 @@ use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::storage_layer::LayerVisibilityHint;
use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
use crate::tenant::timeline::EvictionError;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
#[derive(Default)]
@@ -208,7 +209,7 @@ impl Timeline {
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
guard
.likely_resident_layers()

View File

@@ -15,6 +15,7 @@ use super::{Timeline, TimelineDeleteProgress};
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
mod flow;
mod importbucket_client;
@@ -163,7 +164,10 @@ async fn prepare_import(
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it

View File

@@ -56,6 +56,7 @@ use crate::pgdatadir_mapping::{
};
use crate::task_mgr::TaskKind;
use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
pub async fn run(
timeline: Arc<Timeline>,
@@ -984,7 +985,10 @@ impl ChunkProcessingJob {
let (desc, path) = writer.finish(ctx).await?;
{
let guard = timeline.layers.read().await;
let guard = timeline
.layers
.read(LayerManagerLockHolder::ImportPgData)
.await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation == timeline.generation {
@@ -1007,7 +1011,10 @@ impl ChunkProcessingJob {
// certain that the existing layer is identical to the new one, so in that case
// we replace the old layer with the one we just generated.
let mut guard = timeline.layers.write().await;
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
let existing_layer = guard
.try_get_from_key(&resident_layer.layer_desc().key())
@@ -1036,7 +1043,7 @@ impl ChunkProcessingJob {
}
}
crate::tenant::timeline::drop_wlock(guard);
crate::tenant::timeline::drop_layer_manager_wlock(guard);
timeline
.remote_client

View File

@@ -1,5 +1,8 @@
use std::collections::HashMap;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
@@ -20,6 +23,155 @@ use crate::tenant::storage_layer::{
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
};
/// Warn if the lock was held for longer than this threshold.
/// It's very generous and we should bring this value down over time.
const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
/// Describes the operation that is holding the layer manager lock
#[derive(Debug, Clone, Copy, strum_macros::Display)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum LayerManagerLockHolder {
GetLayerMapInfo,
GenerateHeatmap,
GetPage,
Init,
LoadLayerMap,
GetLayerForWrite,
TryFreezeLayer,
FlushFrozenLayer,
FlushLoop,
Compaction,
GarbageCollection,
Shutdown,
ImportPgData,
DetachAncestor,
Eviction,
#[cfg(test)]
Testing,
}
/// Wrapper for the layer manager that tracks the amount of time during which
/// it was held under read or write lock
#[derive(Default)]
pub(crate) struct LockedLayerManager {
locked: tokio::sync::RwLock<LayerManager>,
}
pub(crate) struct LayerManagerReadGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
pub(crate) struct LayerManagerWriteGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
impl Drop for LayerManagerReadGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager read lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Drop for LayerManagerWriteGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager write lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Deref for LayerManagerReadGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl Deref for LayerManagerWriteGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl DerefMut for LayerManagerWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}
impl LockedLayerManager {
pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
let guard = ManuallyDrop::new(self.locked.read().await);
LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_read(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_read()?);
Ok(LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
let guard = ManuallyDrop::new(self.locked.write().await);
LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_write(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_write()?);
Ok(LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
}
/// Provides semantic APIs to manipulate the layer map.
pub(crate) enum LayerManager {
/// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate

View File

@@ -21,7 +21,7 @@ OBJS = \
unstable_extensions.o \
walproposer.o \
walproposer_pg.o \
control_plane_connector.o \
neon_ddl_handler.o \
walsender_hooks.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -1,6 +0,0 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector(void);
#endif

View File

@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);

View File

@@ -33,9 +33,9 @@
#include "extension_server.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_ddl_handler.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "control_plane_connector.h"
#include "logical_replication_monitor.h"
#include "unstable_extensions.h"
#include "walsender_hooks.h"
@@ -454,7 +454,7 @@ _PG_init(void)
InitUnstableExtensionsSupport();
InitLogicalReplicationMonitor();
InitControlPlaneConnector();
InitDDLHandler();
pg_init_extension_server();

View File

@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* control_plane_connector.c
* neon_ddl_handler.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
@@ -13,18 +13,30 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* Support event triggers for neon_superuser
*
* IDENTIFICATION
* contrib/neon/neon_dll_handler.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <curl/curl.h>
#include <unistd.h>
#include "access/xact.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/user.h"
#include "fmgr.h"
#include "libpq/crypt.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/acl.h"
@@ -32,11 +44,16 @@
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/jsonb.h"
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include "control_plane_connector.h"
#include "neon_ddl_handler.h"
#include "neon_utils.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static fmgr_hook_type next_fmgr_hook = NULL;
static needs_fmgr_hook_type next_needs_fmgr_hook = NULL;
static bool neon_event_triggers = true;
static const char *jwt_token = NULL;
@@ -773,6 +790,7 @@ HandleDropRole(DropRoleStmt *stmt)
}
}
static void
HandleRename(RenameStmt *stmt)
{
@@ -782,6 +800,460 @@ HandleRename(RenameStmt *stmt)
return HandleRoleRename(stmt);
}
/*
* Support for Event Triggers.
*
* In vanilla only superuser can create Event Triggers.
*
* We allow it for neon_superuser by temporary switching to superuser. But as
* far as event trigger can fire in superuser context we should protect
* superuser from execution of arbitrary user's code.
*
* The idea was taken from Supabase PR series starting at
* https://github.com/supabase/supautils/pull/98
*/
static bool
neon_needs_fmgr_hook(Oid functionId) {
return (next_needs_fmgr_hook && (*next_needs_fmgr_hook) (functionId))
|| get_func_rettype(functionId) == EVENT_TRIGGEROID;
}
static void
LookupFuncOwnerSecDef(Oid functionId, Oid *funcOwner, bool *is_secdef)
{
Form_pg_proc procForm;
HeapTuple proc_tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionId));
if (!HeapTupleIsValid(proc_tup))
ereport(ERROR,
(errmsg("cache lookup failed for function %u", functionId)));
procForm = (Form_pg_proc) GETSTRUCT(proc_tup);
*funcOwner = procForm->proowner;
*is_secdef = procForm->prosecdef;
ReleaseSysCache(proc_tup);
}
PG_FUNCTION_INFO_V1(noop);
Datum noop(__attribute__ ((unused)) PG_FUNCTION_ARGS) { PG_RETURN_VOID();}
static void
force_noop(FmgrInfo *finfo)
{
finfo->fn_addr = (PGFunction) noop;
finfo->fn_oid = InvalidOid; /* not a known function OID anymore */
finfo->fn_nargs = 0; /* no arguments for noop */
finfo->fn_strict = false;
finfo->fn_retset = false;
finfo->fn_stats = 0; /* no stats collection */
finfo->fn_extra = NULL; /* clear out old context data */
finfo->fn_mcxt = CurrentMemoryContext;
finfo->fn_expr = NULL; /* no parse tree */
}
/*
* Skip executing Event Triggers execution for superusers, because Event
* Triggers are SECURITY DEFINER and user provided code could then attempt
* privilege escalation.
*
* Also skip executing Event Triggers when GUC neon.event_triggers has been
* set to false. This might be necessary to be able to connect again after a
* LOGIN Event Trigger has been installed that would prevent connections as
* neon_superuser.
*/
static void
neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
{
/*
* It can be other needs_fmgr_hook which cause our hook to be invoked for
* non-trigger function, so recheck that is is trigger function.
*/
if (flinfo->fn_oid != InvalidOid &&
get_func_rettype(flinfo->fn_oid) != EVENT_TRIGGEROID)
{
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
return;
}
/*
* The neon_superuser role can use the GUC neon.event_triggers to disable
* firing Event Trigger.
*
* SET neon.event_triggers TO false;
*
* This only applies to the neon_superuser role though, and only allows
* skipping Event Triggers owned by neon_superuser, which we check by
* proxy of the Event Trigger function being owned by neon_superuser.
*
* A role that is created in role neon_superuser should be allowed to also
* benefit from the neon_event_triggers GUC, and will be considered the
* same as the neon_superuser role.
*/
if (event == FHET_START
&& !neon_event_triggers
&& is_neon_superuser())
{
Oid neon_superuser_oid = get_role_oid("neon_superuser", false);
/* Find the Function Attributes (owner Oid, security definer) */
const char *fun_owner_name = NULL;
Oid fun_owner = InvalidOid;
bool fun_is_secdef = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &fun_owner, &fun_is_secdef);
fun_owner_name = GetUserNameFromId(fun_owner, false);
if (RoleIsNeonSuperuser(fun_owner_name)
|| has_privs_of_role(fun_owner, neon_superuser_oid))
{
elog(WARNING,
"Skipping Event Trigger: neon.event_triggers is false");
/*
* we can't skip execution directly inside the fmgr_hook so instead we
* change the event trigger function to a noop function.
*/
force_noop(flinfo);
}
}
/*
* Fire Event Trigger if both function owner and current user are
* superuser, or none of them are.
*/
else if (event == FHET_START
/* still enable it to pass pg_regress tests */
&& !RegressTestMode)
{
/*
* Get the current user oid as of before SECURITY DEFINER change of
* CurrentUserId, and that would be SessionUserId.
*/
Oid current_role_oid = GetSessionUserId();
bool role_is_super = superuser_arg(current_role_oid);
/* Find the Function Attributes (owner Oid, security definer) */
Oid function_owner = InvalidOid;
bool function_is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &function_owner, &function_is_secdef);
function_is_owned_by_super = superuser_arg(function_owner);
/*
* 1. Refuse to run SECURITY DEFINER function that belongs to a
* superuser when the current user is not a superuser itself.
*/
if (!role_is_super
&& function_is_owned_by_super
&& function_is_secdef)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
"and is SECURITY DEFINER",
func_name,
GetUserNameFromId(function_owner, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
/*
* 2. Refuse to run functions that belongs to a non-superuser when the
* current user is a superuser.
*
* We could run a SECURITY DEFINER user-function here and be safe with
* privilege escalation risks, but superuser roles are only used for
* infrastructure maintenance operations, where we prefer to skip
* running user-defined code.
*/
else if (role_is_super && !function_is_owned_by_super)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" "
"is owned by non-superuser role \"%s\", "
"and current_user \"%s\" is superuser",
func_name,
GetUserNameFromId(function_owner, false),
GetUserNameFromId(current_role_oid, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
}
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
}
static Oid prev_role_oid = 0;
static int prev_role_sec_context = 0;
static bool switched_to_superuser = false;
/*
* Switch tp superuser if not yet superuser.
* Returns false if already switched to superuser.
*/
static bool
switch_to_superuser(void)
{
Oid superuser_oid;
if (switched_to_superuser)
return false;
switched_to_superuser = true;
superuser_oid = get_role_oid("cloud_admin", true /*missing_ok*/);
if (superuser_oid == InvalidOid)
superuser_oid = BOOTSTRAP_SUPERUSERID;
GetUserIdAndSecContext(&prev_role_oid, &prev_role_sec_context);
SetUserIdAndSecContext(superuser_oid, prev_role_sec_context |
SECURITY_LOCAL_USERID_CHANGE |
SECURITY_RESTRICTED_OPERATION);
return true;
}
static void
switch_to_original_role(void)
{
SetUserIdAndSecContext(prev_role_oid, prev_role_sec_context);
switched_to_superuser = false;
}
/*
* ALTER ROLE ... SUPERUSER;
*
* Used internally to give superuser to a non-privileged role to allow
* ownership of superuser-only objects such as Event Trigger.
*
* ALTER ROLE foo SUPERUSER;
* ALTER EVENT TRIGGER ... OWNED BY foo;
* ALTER ROLE foo NOSUPERUSER;
*
* Now the EVENT TRIGGER is owned by foo, who can DROP it without having to be
* superuser again.
*/
static void
alter_role_super(const char* rolename, bool make_super)
{
AlterRoleStmt *alter_stmt = makeNode(AlterRoleStmt);
DefElem *defel_superuser =
#if PG_MAJORVERSION_NUM <= 14
makeDefElem("superuser", (Node *) makeInteger(make_super), -1);
#else
makeDefElem("superuser", (Node *) makeBoolean(make_super), -1);
#endif
RoleSpec *rolespec = makeNode(RoleSpec);
rolespec->roletype = ROLESPEC_CSTRING;
rolespec->rolename = pstrdup(rolename);
rolespec->location = -1;
alter_stmt->role = rolespec;
alter_stmt->options = list_make1(defel_superuser);
#if PG_MAJORVERSION_NUM < 15
AlterRole(alter_stmt);
#else
/* ParseState *pstate, AlterRoleStmt *stmt */
AlterRole(NULL, alter_stmt);
#endif
CommandCounterIncrement();
}
/*
* Changes the OWNER of an Event Trigger.
*
* Event Triggers can only be owned by superusers, so this ALTER ROLE with
* SUPERUSER and then removes the property.
*/
static void
alter_event_trigger_owner(const char *obj_name, Oid role_oid)
{
char* role_name = GetUserNameFromId(role_oid, false);
alter_role_super(role_name, true);
AlterEventTriggerOwner(obj_name, role_oid);
CommandCounterIncrement();
alter_role_super(role_name, false);
}
/*
* Neon processing of the CREATE EVENT TRIGGER requires special attention and
* is worth having its own ProcessUtility_hook for that.
*/
static void
ProcessCreateEventTrigger(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
bool sudo = false;
/* We double-check that after local variable declaration block */
CreateEventTrigStmt *stmt = (CreateEventTrigStmt *) parseTree;
/*
* We are going to change the current user privileges (sudo) and might
* need after execution cleanup. For that we want to capture the UserId
* before changing it for our sudo implementation.
*/
const Oid current_user_id = GetUserId();
bool current_user_is_super = superuser_arg(current_user_id);
if (nodeTag(parseTree) != T_CreateEventTrigStmt)
{
ereport(ERROR,
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("ProcessCreateEventTrigger called for the wrong command"));
}
/*
* Allow neon_superuser to create Event Trigger, while keeping the
* ownership of the object.
*
* For that we give superuser membership to the role for the execution of
* the command.
*/
if (IsTransactionState() && is_neon_superuser())
{
/* Find the Event Trigger function Oid */
Oid func_oid = LookupFuncName(stmt->funcname, 0, NULL, false);
/* Find the Function Owner Oid */
Oid func_owner = InvalidOid;
bool is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(func_oid, &func_owner, &is_secdef);
function_is_owned_by_super = superuser_arg(func_owner);
if(!current_user_is_super && function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a superuser role"),
errdetail("current user \"%s\" is not a superuser "
"and Event Trigger function \"%s\" "
"is owned by a superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
if(current_user_is_super && !function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a non-superuser role"),
errdetail("current user \"%s\" is a superuser "
"and function \"%s\" is "
"owned by a non-superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
sudo = switch_to_superuser();
}
PG_TRY();
{
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
/*
* Now that the Event Trigger has been installed via our sudo
* mechanism, if the original role was not a superuser then change
* the event trigger ownership back to the original role.
*
* That way [ ALTER | DROP ] EVENT TRIGGER commands just work.
*/
if (IsTransactionState() && is_neon_superuser())
{
if (!current_user_is_super)
{
/*
* Change event trigger owner to the current role (making
* it a privileged role during the ALTER OWNER command).
*/
alter_event_trigger_owner(stmt->trigname, current_user_id);
}
}
}
PG_FINALLY();
{
if (sudo)
switch_to_original_role();
}
PG_END_TRY();
}
/*
* Neon hooks for DDLs (handling privileges, limiting features, etc).
*/
static void
NeonProcessUtility(
PlannedStmt *pstmt,
@@ -795,6 +1267,27 @@ NeonProcessUtility(
{
Node *parseTree = pstmt->utilityStmt;
/*
* The process utility hook for CREATE EVENT TRIGGER is its own
* implementation and warrant being addressed separately from here.
*/
if (nodeTag(parseTree) == T_CreateEventTrigStmt)
{
ProcessCreateEventTrigger(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
return;
}
/*
* Other commands that need Neon specific implementations are handled here:
*/
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
@@ -833,37 +1326,82 @@ NeonProcessUtility(
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
/*
* Only neon_superuser is granted privilege to edit neon.event_triggers GUC.
*/
static void
neon_event_triggers_assign_hook(bool newval, void *extra)
{
/* MyDatabaseId == InvalidOid || !OidIsValid(GetUserId()) */
if (IsTransactionState() && !is_neon_superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to set neon.event_triggers"),
errdetail("Only \"neon_superuser\" is allowed to set the GUC")));
}
}
void
InitControlPlaneConnector()
InitDDLHandler()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
next_needs_fmgr_hook = needs_fmgr_hook;
needs_fmgr_hook = neon_needs_fmgr_hook;
next_fmgr_hook = fmgr_hook;
fmgr_hook = neon_fmgr_hook;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
/*
* The GUC neon.event_triggers should provide the same effect as the
* Postgres GUC event_triggers, but the neon one is PGC_USERSET.
*
* This allows using the GUC in the connection string and work out of a
* LOGIN Event Trigger that would break database access, all without
* having to edit and reload the Postgres configuration file.
*/
DefineCustomBoolVariable(
"neon.event_triggers",
"Enable firing of event triggers",
NULL,
&neon_event_triggers,
true,
PGC_USERSET,
0,
NULL,
neon_event_triggers_assign_hook,
NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",

View File

@@ -0,0 +1,6 @@
#ifndef CONTROL_DDL_HANDLER_H
#define CONTROL_DDL_HANDLER_H
void InitDDLHandler(void);
#endif

10
poetry.lock generated
View File

@@ -3051,19 +3051,19 @@ files = [
[[package]]
name = "requests"
version = "2.32.3"
version = "2.32.4"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"},
{file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"},
{file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"},
{file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"},
]
[package.dependencies]
certifi = ">=2017.4.17"
charset-normalizer = ">=2,<4"
charset_normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
@@ -3846,4 +3846,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"

View File

@@ -4,6 +4,8 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use base64::Engine as _;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use clashmap::ClashMap;
use jose_jwk::crypto::KeyInfo;
use reqwest::{Client, redirect};
@@ -347,17 +349,17 @@ impl JwkCacheEntryLock {
.split_once('.')
.ok_or(JwtEncodingError::InvalidCompactForm)?;
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)?;
let header = BASE64_URL_SAFE_NO_PAD.decode(header)?;
let header = serde_json::from_slice::<JwtHeader<'_>>(&header)?;
let payloadb = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)?;
let payloadb = BASE64_URL_SAFE_NO_PAD.decode(payload)?;
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payloadb)?;
if let Some(iss) = &payload.issuer {
ctx.set_jwt_issuer(iss.as_ref().to_owned());
}
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)?;
let sig = BASE64_URL_SAFE_NO_PAD.decode(signature)?;
let kid = header.key_id.ok_or(JwtError::MissingKeyId)?;
@@ -796,7 +798,6 @@ mod tests {
use std::net::SocketAddr;
use std::time::SystemTime;
use base64::URL_SAFE_NO_PAD;
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
@@ -871,9 +872,8 @@ mod tests {
key_id: Some(Cow::Owned(kid)),
};
let header =
base64::encode_config(serde_json::to_string(&header).unwrap(), URL_SAFE_NO_PAD);
let body = base64::encode_config(serde_json::to_string(&body).unwrap(), URL_SAFE_NO_PAD);
let header = BASE64_URL_SAFE_NO_PAD.encode(serde_json::to_string(&header).unwrap());
let body = BASE64_URL_SAFE_NO_PAD.encode(serde_json::to_string(&body).unwrap());
format!("{header}.{body}")
}
@@ -883,7 +883,7 @@ mod tests {
let payload = build_jwt_payload(kid, jose_jwa::Signing::Es256);
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
format!("{payload}.{sig}")
}
@@ -893,7 +893,7 @@ mod tests {
let payload = build_custom_jwt_payload(kid, body, jose_jwa::Signing::Es256);
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
format!("{payload}.{sig}")
}
@@ -904,7 +904,7 @@ mod tests {
let payload = build_jwt_payload(kid, jose_jwa::Signing::Rs256);
let sig = SigningKey::<sha2::Sha256>::new(key).sign(payload.as_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
format!("{payload}.{sig}")
}

View File

@@ -11,11 +11,13 @@ use anyhow::Context;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
use futures::future::Either;
use itertools::{Itertools, Position};
use rand::{Rng, thread_rng};
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, warn};
use tracing::{Instrument, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
@@ -314,7 +316,7 @@ pub async fn run() -> anyhow::Result<()> {
let jemalloc = match crate::jemalloc::MetricRecorder::new() {
Ok(t) => Some(t),
Err(e) => {
tracing::error!(error = ?e, "could not start jemalloc metrics loop");
error!(error = ?e, "could not start jemalloc metrics loop");
None
}
};
@@ -520,23 +522,44 @@ pub async fn run() -> anyhow::Result<()> {
}
}
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
for attempt in (0..3).with_position() {
match redis_kv_client.try_connect().await {
Ok(()) => {
info!("Connected to Redis KV client");
maintenance_tasks.spawn(async move {
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);
drop(redis_kv_client);
// `handle_cancel_messages` was terminated due to the tx_cancel
// being dropped. this is not worthy of an error, and this task can only return `Err`,
// so let's wait forever instead.
std::future::pending().await
});
// `handle_cancel_messages` was terminated due to the tx_cancel
// being dropped. this is not worthy of an error, and this task can only return `Err`,
// so let's wait forever instead.
std::future::pending().await
});
break;
}
Err(e) => {
error!("Failed to connect to Redis KV client: {e}");
if matches!(attempt, Position::Last(_)) {
bail!(
"Failed to connect to Redis KV client after {} attempts",
attempt.into_inner()
);
}
let jitter = thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(1000 + jitter)).await;
}
}
}
}
if let Some(regional_redis_client) = regional_redis_client {

View File

@@ -1,5 +1,8 @@
//! Definition and parser for channel binding flag (a part of the `GS2` header).
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
/// Channel binding flag (possibly with params).
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum ChannelBinding<T> {
@@ -55,7 +58,7 @@ impl<T: std::fmt::Display> ChannelBinding<T> {
let mut cbind_input = vec![];
write!(&mut cbind_input, "p={mode},,",).unwrap();
cbind_input.extend_from_slice(get_cbind_data(mode)?);
base64::encode(&cbind_input).into()
BASE64_STANDARD.encode(&cbind_input).into()
}
})
}
@@ -70,9 +73,9 @@ mod tests {
use ChannelBinding::*;
let cases = [
(NotSupportedClient, base64::encode("n,,")),
(NotSupportedServer, base64::encode("y,,")),
(Required("foo"), base64::encode("p=foo,,bar")),
(NotSupportedClient, BASE64_STANDARD.encode("n,,")),
(NotSupportedServer, BASE64_STANDARD.encode("y,,")),
(Required("foo"), BASE64_STANDARD.encode("p=foo,,bar")),
];
for (cb, input) in cases {

View File

@@ -2,6 +2,8 @@
use std::convert::Infallible;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use sha2::Sha256;
@@ -105,7 +107,7 @@ pub(crate) async fn exchange(
secret: &ServerSecret,
password: &[u8],
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
let salt = base64::decode(&secret.salt_base64)?;
let salt = BASE64_STANDARD.decode(&secret.salt_base64)?;
let client_key = derive_client_key(pool, endpoint, password, &salt, secret.iterations).await;
if secret.is_password_invalid(&client_key).into() {

View File

@@ -3,6 +3,9 @@
use std::fmt;
use std::ops::Range;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use super::base64_decode_array;
use super::key::{SCRAM_KEY_LEN, ScramKey};
use super::signature::SignatureBuilder;
@@ -88,7 +91,7 @@ impl<'a> ClientFirstMessage<'a> {
let mut message = String::new();
write!(&mut message, "r={}", self.nonce).unwrap();
base64::encode_config_buf(nonce, base64::STANDARD, &mut message);
BASE64_STANDARD.encode_string(nonce, &mut message);
let combined_nonce = 2..message.len();
write!(&mut message, ",s={salt_base64},i={iterations}").unwrap();
@@ -142,11 +145,7 @@ impl<'a> ClientFinalMessage<'a> {
server_key: &ScramKey,
) -> String {
let mut buf = String::from("v=");
base64::encode_config_buf(
signature_builder.build(server_key),
base64::STANDARD,
&mut buf,
);
BASE64_STANDARD.encode_string(signature_builder.build(server_key), &mut buf);
buf
}
@@ -251,7 +250,7 @@ mod tests {
"iiYEfS3rOgn8S3rtpSdrOsHtPLWvIkdgmHxA0hf3JNOAG4dU"
);
assert_eq!(
base64::encode(msg.proof),
BASE64_STANDARD.encode(msg.proof),
"SRpfsIVS4Gk11w1LqQ4QvCUBZYQmqXNSDEcHqbQ3CHI="
);
}

View File

@@ -15,6 +15,8 @@ mod secret;
mod signature;
pub mod threadpool;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
pub(crate) use exchange::{Exchange, exchange};
use hmac::{Hmac, Mac};
pub(crate) use key::ScramKey;
@@ -32,7 +34,7 @@ pub(crate) const METHODS_WITHOUT_PLUS: &[&str] = &[SCRAM_SHA_256];
fn base64_decode_array<const N: usize>(input: impl AsRef<[u8]>) -> Option<[u8; N]> {
let mut bytes = [0u8; N];
let size = base64::decode_config_slice(input, base64::STANDARD, &mut bytes).ok()?;
let size = BASE64_STANDARD.decode_slice(input, &mut bytes).ok()?;
if size != N {
return None;
}

View File

@@ -1,5 +1,7 @@
//! Tools for SCRAM server secret management.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use subtle::{Choice, ConstantTimeEq};
use super::base64_decode_array;
@@ -56,7 +58,7 @@ impl ServerSecret {
// iteration count 1 for our generated passwords going forward.
// PG16 users can set iteration count=1 already today.
iterations: 1,
salt_base64: base64::encode(nonce),
salt_base64: BASE64_STANDARD.encode(nonce),
stored_key: ScramKey::default(),
server_key: ScramKey::default(),
doomed: true,
@@ -88,7 +90,7 @@ mod tests {
assert_eq!(parsed.iterations, iterations);
assert_eq!(parsed.salt_base64, salt);
assert_eq!(base64::encode(parsed.stored_key), stored_key);
assert_eq!(base64::encode(parsed.server_key), server_key);
assert_eq!(BASE64_STANDARD.encode(parsed.stored_key), stored_key);
assert_eq!(BASE64_STANDARD.encode(parsed.server_key), server_key);
}
}

View File

@@ -16,6 +16,8 @@ use std::sync::atomic::AtomicUsize;
use std::task::{Poll, ready};
use std::time::Duration;
use base64::Engine as _;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use ed25519_dalek::{Signature, Signer, SigningKey};
use futures::Future;
use futures::future::poll_fn;
@@ -346,7 +348,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
jwt.push_str("eyJhbGciOiJFZERTQSJ9.");
// encode the jwt payload in-place
base64::encode_config_buf(payload, base64::URL_SAFE_NO_PAD, &mut jwt);
BASE64_URL_SAFE_NO_PAD.encode_string(payload, &mut jwt);
// create the signature from the encoded header || payload
let sig: Signature = sk.sign(jwt.as_bytes());
@@ -354,7 +356,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
jwt.push('.');
// encode the jwt signature in-place
base64::encode_config_buf(sig.to_bytes(), base64::URL_SAFE_NO_PAD, &mut jwt);
BASE64_URL_SAFE_NO_PAD.encode_string(sig.to_bytes(), &mut jwt);
debug_assert_eq!(
jwt.len(),

View File

@@ -3,6 +3,8 @@ pub mod postgres_rustls;
pub mod server_config;
use anyhow::Context;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use rustls::pki_types::CertificateDer;
use sha2::{Digest, Sha256};
use tracing::{error, info};
@@ -58,7 +60,7 @@ impl TlsServerEndPoint {
let oid = certificate.signature_algorithm.oid;
if SHA256_OIDS.contains(&oid) {
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
info!(%subject, tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
info!(%subject, tls_server_end_point = %BASE64_STANDARD.encode(tls_server_end_point), "determined channel binding");
Ok(Self::Sha256(tls_server_end_point))
} else {
error!(%subject, "unknown channel binding");

View File

@@ -9,7 +9,7 @@ pytest = "^7.4.4"
psycopg2-binary = "^2.9.10"
typing-extensions = "^4.12.2"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.32.3"
requests = "^2.32.4"
pytest-xdist = "^3.3.1"
asyncpg = "^0.30.0"
aiopg = "^1.4.0"

View File

@@ -0,0 +1 @@
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;

View File

@@ -5,10 +5,11 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
@@ -420,23 +421,31 @@ impl ComputeHook {
preferred_az: _preferred_az,
} = reconfigure_request;
let compute_pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
})
.collect::<Vec<_>>();
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
tracing::info!("Reconfiguring endpoint {endpoint_name}");
let pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
if endpoint.grpc {
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
}
})
.collect::<Vec<_>>();
endpoint
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
.reconfigure(pageservers, *stripe_size, None)
.await
.map_err(NotifyError::NeonLocal)?;
}

View File

@@ -1398,6 +1398,31 @@ async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, Ap
)
}
async fn handle_tenant_timeline_locate(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::Admin)?;
maybe_rate_limit(&req, tenant_id).await;
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(_req) => {}
};
json_response(
StatusCode::OK,
service
.tenant_timeline_locate(tenant_id, timeline_id)
.await?,
)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -2045,10 +2070,10 @@ pub fn make_router(
router
.data(Arc::new(HttpState::new(service, auth, build_info)))
// Non-prefixed generic endpoints (status, metrics, profiling)
.get("/metrics", |r| {
named_request_span(r, measured_metrics_handler, RequestName("metrics"))
})
// Non-prefixed generic endpoints (status, metrics, profiling)
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})
@@ -2139,6 +2164,16 @@ pub fn make_router(
)
},
)
.get(
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_locate,
RequestName("v1_tenant_timeline_locate"),
)
},
)
.get("/debug/v1/scheduler", |r| {
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})

View File

@@ -97,7 +97,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
measured::CounterVec<SafekeeperRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
@@ -111,7 +111,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
measured::HistogramVec<SafekeeperRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
@@ -136,16 +136,17 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
/// HTTP request status counters for handled requests
/// Indicator of stucked (long-running) reconciles, broken down by tenant, shard and sequence.
/// The metric is automatically removed once the reconciliation completes.
pub(crate) storage_controller_reconcile_long_running:
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
/// Indicator of safekeeper reconciler queue depth, broken down by safekeeper, excluding ongoing reconciles.
pub(crate) storage_controller_safkeeper_reconciles_queued:
pub(crate) storage_controller_safekeeper_reconciles_queued:
measured::GaugeVec<SafekeeperReconcilerLabelGroupSet>,
/// Indicator of completed safekeeper reconciles, broken down by safekeeper.
pub(crate) storage_controller_safkeeper_reconciles_complete:
pub(crate) storage_controller_safekeeper_reconciles_complete:
measured::CounterVec<SafekeeperReconcilerLabelGroupSet>,
}
@@ -218,6 +219,16 @@ pub(crate) struct PageserverRequestLabelGroup<'a> {
pub(crate) method: Method,
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = SafekeeperRequestLabelGroupSet)]
pub(crate) struct SafekeeperRequestLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) safekeeper_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) path: &'a str,
pub(crate) method: Method,
}
#[derive(measured::LabelGroup)]
#[label(set = DatabaseQueryErrorLabelGroupSet)]
pub(crate) struct DatabaseQueryErrorLabelGroup {

View File

@@ -37,6 +37,8 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
@@ -100,8 +102,8 @@ impl Node {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
// && self.listen_https_port == register_req.listen_https_port
// Note: HTTPS and gRPC addresses may change, to allow for migrations. See
// [`Self::need_update`] for more details.
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
@@ -109,9 +111,10 @@ impl Node {
// Do we need to update an existing record in DB on this registration request?
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
// listen_https_port is checked here because it may change during migration to https.
// After migration, this check may be moved to registration_match.
// These are checked here, since they may change before we're fully migrated.
self.listen_https_port != register_req.listen_https_port
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|| self.listen_grpc_port != register_req.listen_grpc_port
}
/// For a shard located on this node, populate a response object
@@ -125,6 +128,8 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
@@ -211,6 +216,8 @@ impl Node {
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
use_https: bool,
) -> anyhow::Result<Self> {
@@ -221,6 +228,10 @@ impl Node {
);
}
if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
}
Ok(Self {
id,
listen_http_addr,
@@ -228,6 +239,8 @@ impl Node {
listen_https_port,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
scheduling: NodeSchedulingPolicy::Active,
lifecycle: NodeLifecycle::Active,
availability: NodeAvailability::Offline,
@@ -247,6 +260,8 @@ impl Node {
listen_https_port: self.listen_https_port.map(|x| x as i32),
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
@@ -260,6 +275,13 @@ impl Node {
);
}
if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
anyhow::bail!(
"can't load node {}: must specify both gRPC address and port",
np.node_id
);
}
Ok(Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
@@ -272,6 +294,8 @@ impl Node {
listen_https_port: np.listen_https_port.map(|x| x as u16),
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
listen_grpc_addr: np.listen_grpc_addr,
listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
availability_zone_id: AvailabilityZone(np.availability_zone_id),
use_https,
cancel: CancellationToken::new(),
@@ -361,6 +385,8 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
}

View File

@@ -2125,6 +2125,8 @@ pub(crate) struct NodePersistence {
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
pub(crate) lifecycle: String,
pub(crate) listen_grpc_addr: Option<String>,
pub(crate) listen_grpc_port: Option<i32>,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -5,7 +5,7 @@ use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use crate::metrics::PageserverRequestLabelGroup;
use crate::metrics::SafekeeperRequestLabelGroup;
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
@@ -19,8 +19,8 @@ pub(crate) struct SafekeeperClient {
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
let labels = SafekeeperRequestLabelGroup {
safekeeper_id: $node_id,
path: $name,
method: $method,
};
@@ -35,7 +35,7 @@ macro_rules! measured_request {
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
.storage_controller_safekeeper_request_error;
error_counters.inc(labels)
}

View File

@@ -945,6 +945,8 @@ pub(crate) mod test_utils {
None,
format!("pghost-{i}"),
5432 + i as u16,
Some(format!("grpchost-{i}")),
Some(51051 + i as u16),
az_iter
.next()
.cloned()

View File

@@ -34,6 +34,8 @@ diesel::table! {
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
lifecycle -> Varchar,
listen_grpc_addr -> Nullable<Varchar>,
listen_grpc_port -> Nullable<Int4>,
}
}

View File

@@ -1683,6 +1683,8 @@ impl Service {
None,
"".to_string(),
123,
None,
None,
AvailabilityZone("test_az".to_string()),
false,
)
@@ -2267,6 +2269,7 @@ impl Service {
// fail, and start from scratch, so it doesn't make sense for us to try and preserve
// the stale/multi states at this point.
mode: LocationConfigMode::AttachedSingle,
stripe_size: shard.shard.stripe_size,
});
shard.generation = std::cmp::max(shard.generation, Some(new_gen));
@@ -2300,6 +2303,7 @@ impl Service {
id: *tenant_shard_id,
r#gen: None,
mode: LocationConfigMode::Secondary,
stripe_size: shard.shard.stripe_size,
});
// We must not update observed, because we have no guarantee that our
@@ -6649,6 +6653,8 @@ impl Service {
/// This is for debug/support only: assuming tenant data is already present in S3, we "create" a
/// tenant with a very high generation number so that it will see the existing data.
/// It does not create timelines on safekeepers, because they might already exist on some
/// safekeeper set. So, the timelines are not storcon-managed after the import.
pub(crate) async fn tenant_import(
&self,
tenant_id: TenantId,
@@ -7250,6 +7256,12 @@ impl Service {
));
}
if register_req.listen_grpc_addr.is_some() != register_req.listen_grpc_port.is_some() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"must specify both gRPC address and port"
)));
}
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
@@ -7260,6 +7272,8 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
);

View File

@@ -107,7 +107,7 @@ impl ChaosInjector {
// - Skip shards doing a graceful migration already, so that we allow these to run to
// completion rather than only exercising the first part and then cancelling with
// some other chaos.
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
&& shard.get_preferred_node().is_none()
}

View File

@@ -230,7 +230,7 @@ impl ReconcilerHandle {
// increase it before putting into the queue.
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_queued;
.storage_controller_safekeeper_reconciles_queued;
let label_group = SafekeeperReconcilerLabelGroup {
sk_az: &sk_az,
sk_node_id: &sk_node_id,
@@ -306,7 +306,7 @@ impl SafekeeperReconciler {
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_queued;
.storage_controller_safekeeper_reconciles_queued;
queued_gauge.set(
SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
@@ -547,7 +547,7 @@ impl SafekeeperReconcilerInner {
let complete_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_complete;
.storage_controller_safekeeper_reconciles_complete;
complete_counter.inc(SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
sk_node_id: &req.safekeeper.get_id().to_string(),

View File

@@ -17,7 +17,7 @@ use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -26,6 +26,13 @@ use utils::lsn::Lsn;
use super::Service;
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}
impl Service {
/// Timeline creation on safekeepers
///
@@ -396,6 +403,38 @@ impl Service {
Ok(())
}
/// Locate safekeepers for a timeline.
/// Return the generation, sk_set and new_sk_set if present.
/// If the timeline is not storcon-managed, return NotFound.
pub(crate) async fn tenant_timeline_locate(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineLocateResponse, ApiError> {
let timeline = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline) = timeline else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
));
};
Ok(TimelineLocateResponse {
generation: SafekeeperGeneration::new(timeline.generation as u32),
sk_set: timeline
.sk_set
.iter()
.map(|id| NodeId(*id as u64))
.collect(),
new_sk_set: timeline
.new_sk_set
.map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
})
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,

View File

@@ -1184,11 +1184,19 @@ impl TenantShard {
for secondary in self.intent.get_secondary() {
// Make sure we don't try to migrate a secondary to our attached location: this case happens
// easily in environments without multiple AZs.
let exclude = match self.intent.attached {
let mut exclude = match self.intent.attached {
Some(attached) => vec![attached],
None => vec![],
};
// Exclude all other secondaries from the scheduling process to avoid replacing
// one existing secondary with another existing secondary.
for another_secondary in self.intent.secondary.iter() {
if another_secondary != secondary {
exclude.push(*another_secondary);
}
}
let replacement = match &self.policy {
PlacementPolicy::Attached(_) => {
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
@@ -1348,28 +1356,19 @@ impl TenantShard {
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
/// funciton should not be used to decide whether to reconcile.
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
if let Some(attach_intent) = self.intent.attached {
match self.observed.locations.get(&attach_intent) {
Some(loc) => match &loc.conf {
Some(conf) => match conf.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// Our intent and observed state agree that this node is in an attached state.
Some(attach_intent)
}
// Our observed config is not an attached state
_ => None,
},
// Our observed state is None, i.e. in flux
None => None,
},
// We have no observed state for this node
None => None,
}
} else {
// Our intent is not to attach
None
// We have an intent to attach for this node
let attach_intent = self.intent.attached?;
// We have an observed state for this node
let location = self.observed.locations.get(&attach_intent)?;
// Our observed state is not None, i.e. not in flux
let location_config = location.conf.as_ref()?;
// Check if our intent and observed state agree that this node is in an attached state.
match location_config.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => Some(attach_intent),
_ => None,
}
}

View File

@@ -69,8 +69,10 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarm_lfc(self, from_endpoint_id: str | None = None):
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(url, params=params).raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()

View File

@@ -2223,6 +2223,17 @@ class NeonStorageController(MetricsGetter, LogUtils):
shards: list[dict[str, Any]] = body["shards"]
return shards
def timeline_locate(self, tenant_id: TenantId, timeline_id: TimelineId):
"""
:return: dict {"generation": int, "sk_set": [int], "new_sk_set": [int]}
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def tenant_describe(self, tenant_id: TenantId):
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int, preferred_az_id: str}

View File

@@ -69,6 +69,11 @@ def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
).value
== i + 1
)
# There should be only one basebackup file in the cache.
assert metrics.query_one("pageserver_basebackup_cache_entries_total").value == 1
# The size of one basebackup for new DB is ~20KB.
size_bytes = metrics.query_one("pageserver_basebackup_cache_size_bytes").value
assert 10 * 1024 <= size_bytes <= 100 * 1024
wait_until(check_metrics)

View File

@@ -18,6 +18,8 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
Safekeeper,
StorageControllerApiException,
flush_ep_to_pageserver,
)
from fixtures.pageserver.http import PageserverApiException
@@ -26,6 +28,7 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
from fixtures.safekeeper.http import MembershipConfiguration
from fixtures.workload import Workload
if TYPE_CHECKING:
@@ -125,6 +128,12 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
reason="CHECK_ONDISK_DATA_COMPATIBILITY env is not set",
)
skip_old_debug_versions = pytest.mark.skipif(
os.getenv("BUILD_TYPE", "debug") == "debug"
and os.getenv("DEFAULT_PG_VERSION") in [PgVersion.V14, PgVersion.V15, PgVersion.V16],
reason="compatibility snaphots not available for old versions of debug builds",
)
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
@@ -195,6 +204,7 @@ ingest_lag_log_line = ".*ingesting record with timestamp lagging more than wait_
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_backward_compatibility(
@@ -222,6 +232,7 @@ def test_backward_compatibility(
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
@@ -291,7 +302,20 @@ def test_forward_compatibility(
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
ep = env.endpoints.create("main")
ep_env = {"LD_LIBRARY_PATH": str(env.pg_distrib_dir / f"v{env.pg_version}/lib")}
ep.start(env=ep_env)
# If the compatibility snapshot was created with --timelines-onto-safekeepers=false,
# we should not pass safekeeper_generation to the endpoint because the compute
# will not be able to start.
# Zero generation is INVALID_GENERATION.
generation = 0
try:
res = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
generation = res["generation"]
except StorageControllerApiException as e:
if e.status_code != 404 or not re.search(r"Timeline .* not found", str(e)):
raise e
ep.start(env=ep_env, safekeeper_generation=generation)
connstr = ep.connstr()
@@ -341,7 +365,7 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
)
# Timeline exists again: restart the endpoint
ep.start(env=ep_env)
ep.start(env=ep_env, safekeeper_generation=generation)
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
@@ -542,6 +566,24 @@ def test_historic_storage_formats(
# All our artifacts should contain at least one timeline
assert len(timelines) > 0
# Import tenant does not create the timeline on safekeepers,
# because it is a debug handler and the timeline may have already been
# created on some set of safekeepers.
# Create the timeline on safekeepers manually.
# TODO(diko): when we have the script/storcon handler to migrate
# the timeline to storcon, we can replace this code with it.
mconf = MembershipConfiguration(
generation=1,
members=Safekeeper.sks_to_safekeeper_ids([env.safekeepers[0]]),
new_members=None,
)
members_sks = Safekeeper.mconf_sks(env, mconf)
for timeline in timelines:
Safekeeper.create_timeline(
dataset.tenant_id, timeline["timeline_id"], env.pageserver, mconf, members_sks
)
# TODO: ensure that the snapshots we're importing contain a sensible variety of content, at the very
# least they should include a mixture of deltas and image layers. Preferably they should also
# contain some "exotic" stuff like aux files from logical replication.
@@ -573,6 +615,7 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),

View File

@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
# Same thing as prewarm_lfc(), testing other method
http_client.prewarm_lfc(endpoint.endpoint_id)
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))

View File

@@ -306,13 +306,7 @@ def test_sql_regress(
)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# Enable the test mode, so that we don't need to patch the test cases.
"neon.regress_test_mode = true",
],
)
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# Create some local directories for pg_regress to run in.

View File

@@ -430,6 +430,7 @@ def test_tenant_delete_stale_shards(neon_env_builder: NeonEnvBuilder, pg_bin: Pg
workload.init()
workload.write_rows(256)
workload.validate()
workload.stop()
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,

View File

@@ -0,0 +1,90 @@
create or replace function admin_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'admin event trigger is executed for %', current_user;
end;
$$;
create role neon_superuser;
create role neon_admin login inherit createrole createdb in role neon_superuser;
grant create on schema public to neon_admin;
create database neondb with owner neon_admin;
grant all privileges on database neondb to neon_superuser;
create role neon_user;
grant create on schema public to neon_user;
create event trigger on_ddl1 on ddl_command_end
execute procedure admin_proc();
set role neon_user;
-- check that non-privileged user can not change neon.event_triggers
set neon.event_triggers to false;
ERROR: permission denied to set neon.event_triggers
DETAIL: Only "neon_superuser" is allowed to set the GUC
-- Non-privileged neon user should not be able to create event trigers
create event trigger on_ddl2 on ddl_command_end
execute procedure admin_proc();
ERROR: permission denied to create event trigger "on_ddl2"
HINT: Must be superuser to create an event trigger.
set role neon_admin;
-- neon_superuser should be able to create event trigers
create or replace function neon_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neon event trigger is executed for %', current_user;
end;
$$;
NOTICE: admin event trigger is executed for neon_admin
create event trigger on_ddl2 on ddl_command_end
execute procedure neon_proc();
\c neondb neon_admin
create or replace function neondb_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neondb event trigger is executed for %', current_user;
end;
$$;
create or replace function neondb_secdef_proc()
returns event_trigger
language plpgsql
SECURITY DEFINER
as
$$
begin
raise notice 'neondb secdef event trigger is executed for %', current_user;
end;
$$;
-- neon_admin (neon_superuser member) should be able to create event triggers
create event trigger on_ddl3 on ddl_command_end
execute procedure neondb_proc();
create event trigger on_ddl4 on ddl_command_end
execute procedure neondb_secdef_proc();
-- Check that event trigger is fired for neon_admin
create table t1(x integer);
NOTICE: neondb event trigger is executed for neon_admin
NOTICE: neondb secdef event trigger is executed for neon_admin
-- Check that event trigger can be skipped
set neon.event_triggers to false;
create table t2(x integer);
WARNING: Skipping Event Trigger: neon.event_triggers is false
WARNING: Skipping Event Trigger: neon.event_triggers is false
\c regression cloud_admin
-- Check that event triggers are not fired for superuser
create table t3(x integer);
NOTICE: admin event trigger is executed for cloud_admin
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neon_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
\c neondb cloud_admin
-- Check that user-defined event triggers are not fired for superuser
create table t4(x integer);
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neondb_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neondb_secdef_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
\c neondb neon_admin
-- Check that neon_admin can drop event triggers
drop event trigger on_ddl3;
drop event trigger on_ddl4;

Some files were not shown because too many files have changed in this diff Show More