Compare commits

..

1 Commits

Author SHA1 Message Date
Tristan Partin
32ea0d3a11 Blha 2025-03-31 15:58:24 -05:00
295 changed files with 3801 additions and 5386 deletions

View File

@@ -8,7 +8,6 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AWS_ECR_REGION
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
@@ -16,25 +15,23 @@ config-variables:
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_INGEST_TARGET_PROJECTID
- BENCHMARK_LARGE_OLTP_PROJECTID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- DEV_AWS_OIDC_ROLE_ARN
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN
- BENCHMARK_INGEST_TARGET_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- SLACK_ON_CALL_QA_STAGING_STREAM
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION
- BENCHMARK_LARGE_OLTP_PROJECTID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_RUST_CHANNEL_ID

View File

@@ -128,49 +128,29 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg_14
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}

View File

@@ -37,14 +37,8 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cache poetry deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
- uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}

View File

@@ -48,13 +48,8 @@ jobs:
submodules: true
- name: Cache cargo deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src

View File

@@ -5,9 +5,6 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
outputs:
build-tag:
description: "Tag for the current workflow run"
@@ -30,9 +27,6 @@ on:
release-pr-run-id:
description: "Only available if `run-kind in [storage-release, proxy-release, compute-release]`. Contains the run ID of the `Build and Test` workflow, assuming one with the current commit can be found."
value: ${{ jobs.tags.outputs.release-pr-run-id }}
sha:
description: "github.event.pull_request.head.sha on release PRs, github.sha otherwise"
value: ${{ jobs.tags.outputs.sha }}
permissions: {}
@@ -51,7 +45,6 @@ jobs:
storage: ${{ steps.previous-releases.outputs.storage }}
run-kind: ${{ steps.run-kind.outputs.run-kind }}
release-pr-run-id: ${{ steps.release-pr-run-id.outputs.release-pr-run-id }}
sha: ${{ steps.sha.outputs.sha }}
permissions:
contents: read
steps:
@@ -61,6 +54,10 @@ jobs:
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get run kind
id: run-kind
env:
@@ -81,23 +78,6 @@ jobs:
run: |
echo "run-kind=$RUN_KIND" | tee -a $GITHUB_OUTPUT
- name: Get the right SHA
id: sha
env:
SHA: >
${{
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), steps.run-kind.outputs.run-kind)
&& fromJSON(inputs.github-event-json).pull_request.head.sha
|| github.sha
}}
run: |
echo "sha=$SHA" | tee -a $GITHUB_OUTPUT
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
ref: ${{ steps.sha.outputs.sha }}
- name: Get build tag
id: build-tag
env:
@@ -163,7 +143,7 @@ jobs:
if: ${{ contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), steps.run-kind.outputs.run-kind) }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.sha }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -63,13 +63,8 @@ jobs:
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -134,25 +129,15 @@ jobs:
- name: Cache postgres v17 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -218,57 +203,32 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache cargo deps (only for v17)
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src
@@ -278,13 +238,8 @@ jobs:
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}

View File

@@ -80,7 +80,6 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
build-build-tools-image:
needs: [ check-permissions ]
@@ -249,13 +248,8 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cache poetry deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
@@ -546,7 +540,6 @@ jobs:
uses: ./.github/workflows/trigger-e2e-tests.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
secrets: inherit
neon-image-arch:
@@ -570,7 +563,6 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0
@@ -680,7 +672,6 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0

View File

@@ -55,7 +55,7 @@ jobs:
echo tag=${tag} >> ${GITHUB_OUTPUT}
- name: Test extension upgrade
timeout-minutes: 60
timeout-minutes: 20
env:
NEW_COMPUTE_TAG: latest
OLD_COMPUTE_TAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}

View File

@@ -23,7 +23,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 2 hours
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -43,7 +43,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 48 hours
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -63,7 +63,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 30 days
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"

View File

@@ -9,9 +9,6 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
defaults:
run:
@@ -51,7 +48,6 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ inputs.github-event-name || github.event_name }}
github-event-json: ${{ inputs.github-event-json || toJSON(github.event) }}
trigger-e2e-tests:
needs: [ meta ]

62
Cargo.lock generated
View File

@@ -148,9 +148,9 @@ dependencies = [
[[package]]
name = "arc-swap"
version = "1.7.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
@@ -1167,6 +1167,18 @@ dependencies = [
"half",
]
[[package]]
name = "cicc"
version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"glob",
"serde",
"toml",
"url",
]
[[package]]
name = "clang-sys"
version = "1.6.1"
@@ -2498,9 +2510,9 @@ dependencies = [
[[package]]
name = "glob"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
[[package]]
name = "governor"
@@ -2809,7 +2821,6 @@ name = "http-utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"bytes",
"camino",
"fail",
@@ -2822,7 +2833,6 @@ dependencies = [
"pprof",
"regex",
"routerify",
"rustls 0.23.18",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
@@ -3861,10 +3871,11 @@ dependencies = [
[[package]]
name = "num-bigint"
version = "0.4.6"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
@@ -3913,10 +3924,11 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.46"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
@@ -3945,9 +3957,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.19"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
"libm",
@@ -4692,7 +4704,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
dependencies = [
"base64 0.22.1",
"byteorder",
@@ -4726,7 +4738,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
dependencies = [
"bytes",
"chrono",
@@ -5360,25 +5372,26 @@ dependencies = [
[[package]]
name = "redis"
version = "0.29.2"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b110459d6e323b7cda23980c46c77157601199c9da6241552b284cd565a7a133"
checksum = "71d64e978fd98a0e6b105d066ba4889a7301fca65aeac850a877d8797343feeb"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"combine",
"futures-util",
"itoa",
"num-bigint",
"percent-encoding",
"pin-project-lite",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-rustls 0.26.0",
"tokio-rustls 0.25.0",
"tokio-util",
"url",
]
@@ -6604,7 +6617,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"camino",
"chrono",
"clap",
"clashmap",
@@ -6648,7 +6660,6 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-util",
"tracing",
"utils",
@@ -7171,7 +7182,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.10"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#394851e467755562b4173ff68f9eb0e7f737be13"
dependencies = [
"async-trait",
"byteorder",
@@ -7214,14 +7225,15 @@ dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"parking_lot 0.12.1",
"phf",
"pin-project-lite",
"postgres-protocol2",
"postgres-types2",
"serde",
"tokio",
"tokio-util",
"tracing",
]
[[package]]

View File

@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = [
"cicc",
"compute_tools",
"control_plane",
"control_plane/storcon_cli",
@@ -50,7 +51,7 @@ license = "Apache-2.0"
[workspace.dependencies]
ahash = "0.8"
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.7"
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
flate2 = "1.0.26"
@@ -95,6 +96,7 @@ futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
glob = "0.3.2"
governor = "0.8"
hashbrown = "0.14"
hashlink = "0.9.1"
@@ -130,7 +132,7 @@ nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal"
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.19"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
@@ -146,7 +148,7 @@ procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
@@ -210,7 +212,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["s
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"
url = { version = "2.2", features = ["serde"] }
urlencoding = "2.1"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"

12
cicc/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "cicc"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
clap.workspace = true
glob.workspace = true
serde.workspace = true
toml.workspace = true
url.workspace = true

View File

@@ -0,0 +1,22 @@
{
"$id": "extension",
"properties": {
"name": {
"type": "string"
},
"version": {
"type": "string"
},
"build-system": {
"enum": [
"pgxs"
]
},
"tarball": {
"type": "string"
},
"checksum": {
"type": "string"
}
}
}

145
cicc/src/main.rs Normal file
View File

@@ -0,0 +1,145 @@
use std::{io::Write, path::PathBuf};
use anyhow::Result;
use clap::Parser;
use glob::glob;
use serde::Deserialize;
#[derive(Debug, Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
directory: PathBuf,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
enum BuildSystem {
#[serde(rename = "pgxs")]
PGXS,
#[serde(rename = "cmake-ninja")]
CmakeNinja,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
struct ExtensionManifest {
pub name: String,
pub version: String,
pub build_system: BuildSystem,
#[serde(default)]
pub build_arguments: Vec<String>,
pub tarball: String,
pub checksum: String,
pub trusted: bool,
}
impl TryFrom<PathBuf> for ExtensionManifest {
type Error = anyhow::Error;
fn try_from(path: PathBuf) -> std::result::Result<Self, Self::Error> {
let content = std::fs::read_to_string(path)?;
let manifest: ExtensionManifest = toml::from_str(&content)?;
Ok(manifest)
}
}
impl ExtensionManifest {
pub fn compile(self) {
let mut stdout = std::io::stdout().lock();
writeln!(
&mut stdout,
r"FROM build-deps AS {name}-src
ARG PG_VERSION
WORKDIR /ext-src
RUN wget '{tarball}' -O '{name}.tar.gz' && \
echo '{checksum} {name}.tar.gz' | sha256sum --check && \
mkdir '{name}-src' && cd '{name}-src' && tar xzf '../{name}.tar.gz' --strip-components=1 -C .
FROM pg-build AS {name}-build
COPY --from={name}-src /ext-src/ /ext-src/
WORKDIR /ext-src/{name}-src
",
name = self.name,
tarball = self.tarball,
checksum = self.checksum
)
.unwrap();
match self.build_system {
BuildSystem::PGXS => writeln!(
&mut stdout,
r#"RUN make -j "$(getconf _NPROCESSORS_ONLN)" install && \"#,
)
.unwrap(),
BuildSystem::CmakeNinja => writeln!(
&mut stdout,
r"RUN cmake -G Ninja -B build -DCMAKE_BUILD_TYPE=Release {build_args}
ninja -C install && \",
build_args = self.build_arguments.join(" ")
)
.unwrap(),
}
if self.trusted {
writeln!(
&mut stdout,
" echo 'trusted = true' >> '/usr/local/pgsql/share/extension/{name}.control",
name = self.name
)
.unwrap();
} else {
writeln!(&mut stdout, " true").unwrap();
}
write!(&mut stdout, "\n").unwrap();
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
struct DependencyManifest {}
fn main() -> Result<()> {
let cli = Cli::parse();
if !cli.directory.exists() {
eprintln!("Directory does not exist");
std::process::exit(1);
}
for entry in glob(cli.directory.join("*.toml").to_str().unwrap()).unwrap() {
let path = match entry {
Ok(entry) => entry,
Err(_) => continue,
};
let manifest: ExtensionManifest = match path.clone().try_into() {
Ok(manifest) => manifest,
Err(e) => {
eprintln!("Failed to read {}: {}", path.display(), e);
std::process::exit(1);
}
};
manifest.compile();
}
Ok(())
}
#[cfg(test)]
mod test {
use clap::CommandFactory;
use super::Cli;
#[test]
fn verify_cli() {
Cli::command().debug_assert()
}
}

View File

@@ -1916,30 +1916,26 @@ RUN apt update && \
;; \
esac && \
apt install --no-install-recommends -y \
ca-certificates \
gdb \
iproute2 \
liblz4-1 \
libreadline8 \
libboost-iostreams1.74.0 \
libboost-regex1.74.0 \
libboost-serialization1.74.0 \
libboost-system1.74.0 \
libcurl4 \
libevent-2.1-7 \
libgeos-c1v5 \
liblz4-1 \
libossp-uuid16 \
libgeos-c1v5 \
libprotobuf-c1 \
libreadline8 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4 \
libevent-2.1-7 \
locales \
lsof \
procps \
ca-certificates \
rsyslog \
screen \
tcpdump \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -0,0 +1,20 @@
name: ip4r
build-system: "pgxs"
trusted: true
pg14: &pg14
version: "2.10.1"
tarball: "https://github.com/timescale/timescaledb/archive/refs/tags/{version}.tar.gz"
checksum: 6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73
pg15:
<<: *pg14
pg16:
<<: *pg14
pg17:
<<: *pg14

View File

@@ -0,0 +1,9 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -0,0 +1,9 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -0,0 +1,9 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -0,0 +1,9 @@
name = "ip4r"
version = "2.4.2"
trusted = true
build-system = "pgxs"
tarball = "https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz"
checksum = "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b"

View File

@@ -0,0 +1,14 @@
name = "timescaledb"
version = "2.17.1"
trusted = true
build-system = "cmake-ninja"
build-arguments = [
"-DSEND_TELEMETRY_DEFAULT:BOOL=OFF",
"-DUSE_TELEMETRY:BOOL=OFF",
"-DAPACHE_ONLY:BOOL=ON",
]
tarball = "https://github.com/timescale/timescaledb/archive/refs/tags/2.17.1.tar.gz"
checksum = "6277cf43f5695e23dae1c5cfeba00474d730b66ed53665a84b787a6bb1a57e28"

View File

@@ -45,9 +45,7 @@ use anyhow::{Context, Result};
use clap::Parser;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
@@ -59,6 +57,10 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
@@ -145,7 +147,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init())?;
let build_tag = runtime.block_on(init())?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -172,6 +174,8 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
build_tag,
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
@@ -185,7 +189,7 @@ fn main() -> Result<()> {
deinit_and_exit(exit_code);
}
async fn init() -> Result<()> {
async fn init() -> Result<String> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
@@ -195,9 +199,12 @@ async fn init() -> Result<()> {
}
});
info!("compute build_tag: {}", &BUILD_TAG.to_string());
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
info!("build_tag: {build_tag}");
Ok(())
Ok(build_tag)
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {

View File

@@ -31,7 +31,6 @@ use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version};
use nix::unistd::Pid;
use std::ops::Not;
use tracing::{Instrument, error, info, info_span, warn};
use utils::fs_ext::is_directory_empty;
@@ -45,7 +44,7 @@ mod s3_uri;
const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
#[derive(Subcommand, Debug, Clone, serde::Serialize)]
#[derive(Subcommand, Debug)]
enum Command {
/// Runs local postgres (neon binary), restores into it,
/// uploads pgdata to s3 to be consumed by pageservers
@@ -85,15 +84,6 @@ enum Command {
},
}
impl Command {
fn as_str(&self) -> &'static str {
match self {
Command::Pgdata { .. } => "pgdata",
Command::DumpRestore { .. } => "dump-restore",
}
}
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, env = "NEON_IMPORTER_WORKDIR")]
@@ -447,7 +437,7 @@ async fn run_dump_restore(
#[allow(clippy::too_many_arguments)]
async fn cmd_pgdata(
s3_client: Option<&aws_sdk_s3::Client>,
s3_client: Option<aws_sdk_s3::Client>,
kms_client: Option<aws_sdk_kms::Client>,
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
@@ -516,14 +506,14 @@ async fn cmd_pgdata(
if let Some(s3_prefix) = maybe_s3_prefix {
info!("upload pgdata");
aws_s3_sync::upload_dir_recursive(
s3_client.unwrap(),
s3_client.as_ref().unwrap(),
Utf8Path::new(&pgdata_dir),
&s3_prefix.append("/pgdata/"),
)
.await
.context("sync dump directory to destination")?;
info!("write pgdata status to s3");
info!("write status");
{
let status_dir = workdir.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
@@ -560,15 +550,13 @@ async fn cmd_dumprestore(
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await
.context("decrypt source connection string")?;
.await?;
let dest = if let Some(dest_ciphertext) =
spec.destination_connstring_ciphertext_base64
{
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await
.context("decrypt destination connection string")?
.await?
} else {
bail!(
"destination connection string must be provided in spec for dump_restore command"
@@ -613,18 +601,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
// Initialize AWS clients only if s3_prefix is specified
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
// Create AWS config with enhanced retry settings
let config = aws_config::defaults(BehaviorVersion::v2024_03_28())
.retry_config(
aws_config::retry::RetryConfig::standard()
.with_max_attempts(5) // Retry up to 5 times
.with_initial_backoff(std::time::Duration::from_millis(200)) // Start with 200ms delay
.with_max_backoff(std::time::Duration::from_secs(5)), // Cap at 5 seconds
)
.load()
.await;
// Create clients from the config with enhanced retry settings
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let s3_client = aws_sdk_s3::Client::new(&config);
let kms = aws_sdk_kms::Client::new(&config);
(Some(s3_client), Some(kms))
@@ -632,108 +609,79 @@ pub(crate) async fn main() -> anyhow::Result<()> {
(None, None)
};
// Capture everything from spec assignment onwards to handle errors
let res = async {
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
match args.command.clone() {
Command::Pgdata {
match args.command {
Command::Pgdata {
source_connection_string,
interactive,
pg_port,
num_cpus,
memory_mb,
} => {
cmd_pgdata(
s3_client,
kms_client,
args.s3_prefix,
spec,
source_connection_string,
interactive,
pg_port,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
} => {
cmd_pgdata(
s3_client.as_ref(),
kms_client,
args.s3_prefix.clone(),
spec,
source_connection_string,
interactive,
pg_port,
args.working_directory.clone(),
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
)
.await
}
Command::DumpRestore {
)
.await?;
}
Command::DumpRestore {
source_connection_string,
destination_connection_string,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
args.working_directory.clone(),
args.pg_bin_dir,
args.pg_lib_dir,
)
.await
}
}
}
.await;
if let Some(s3_prefix) = args.s3_prefix {
info!("write job status to s3");
{
let status_dir = args.working_directory.join("status");
if std::fs::exists(&status_dir)?.not() {
std::fs::create_dir(&status_dir).context("create status directory")?;
}
let status_file = status_dir.join("fast_import");
let res_obj = match res {
Ok(_) => serde_json::json!({"command": args.command.as_str(), "done": true}),
Err(err) => {
serde_json::json!({"command": args.command.as_str(), "done": false, "error": err.to_string()})
}
};
std::fs::write(&status_file, res_obj.to_string()).context("write status file")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
&status_dir,
&s3_prefix.append("/status/"),
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
)
.await
.context("sync status directory to destination")?;
.await?;
}
}

View File

@@ -20,7 +20,6 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
@@ -36,7 +35,6 @@ use crate::disk_quota::set_disk_quota;
use crate::installed_extensions::get_installed_extensions;
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::rsyslog::{
@@ -51,17 +49,6 @@ use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
// This is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
/// Build tag/version of the compute node binaries/image. It's tricky and ugly
/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a
/// global static variable.
pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string()
});
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
@@ -85,6 +72,7 @@ pub struct ComputeNodeParams {
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
pub build_tag: String,
/// The port that the compute's external HTTP server listens on
pub external_http_port: u16,
@@ -185,11 +173,6 @@ impl ComputeState {
info!("Changing compute status from {} to {}", prev, status);
self.status = status;
state_changed.notify_all();
COMPUTE_CTL_UP.reset();
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
.set(1);
}
pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) {
@@ -369,19 +352,13 @@ impl ComputeNode {
}
.launch(&this);
// The internal HTTP server is needed for a further activation by control plane
// if compute was started for a pool, so we have to start server before hanging
// waiting for a spec.
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
crate::http::server::Server::Internal {
port: this.params.internal_http_port,
}
.launch(&this);
// HTTP server is running, so we can officially declare compute_ctl as 'up'
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
.set(1);
// If we got a spec from the CLI already, use that. Otherwise wait for the
// control plane to pass it to us with a /configure HTTP request
let pspec = if let Some(cli_spec) = cli_spec {
@@ -901,14 +878,6 @@ impl ComputeNode {
info!("Storage auth token not set");
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
@@ -2055,8 +2024,12 @@ LIMIT 100",
let mut download_tasks = Vec::new();
for library in &libs_vec {
let (ext_name, ext_path) =
remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?;
let (ext_name, ext_path) = remote_extensions.get_ext(
library,
true,
&self.params.build_tag,
&self.params.pgversion,
)?;
download_tasks.push(self.download_extension(ext_name, ext_path));
}
let results = join_all(download_tasks).await;

View File

@@ -117,7 +117,6 @@ pub fn write_postgres_conf(
writeln!(file, "lc_numeric='C.UTF-8'")?;
}
writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {

View File

@@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::Deserialize;
use crate::compute::{BUILD_TAG, ComputeNode};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::{Path, Query};
@@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension(
remote_extensions.get_ext(
&filename,
ext_server_params.is_library,
&BUILD_TAG,
&compute.params.build_tag,
&compute.params.pgversion,
)
};

View File

@@ -1,8 +1,7 @@
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -71,19 +70,8 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy<GenericGauge<AtomicF64>> = Lazy::new(
.expect("failed to define a metric")
});
// Report that `compute_ctl` is up and what's the current compute status.
pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"compute_ctl_up",
"Whether compute_ctl is running",
&["build_tag", "status"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());

View File

@@ -165,11 +165,8 @@ pub struct NeonStorageControllerConf {
/// Database url used when running multiple storage controller instances
pub database_url: Option<SocketAddr>,
/// Thresholds for auto-splitting a tenant into shards.
/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
pub max_split_shards: Option<u8>,
pub initial_split_threshold: Option<u64>,
pub initial_split_shards: Option<u8>,
pub max_secondary_lag_bytes: Option<u64>,
@@ -184,8 +181,6 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -206,16 +201,12 @@ impl Default for NeonStorageControllerConf {
start_as_candidate: false,
database_url: None,
split_threshold: None,
max_split_shards: None,
initial_split_threshold: None,
initial_split_shards: None,
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -51,19 +51,11 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
Certificate::from_pem(&buf).expect("CA certificate should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -80,7 +72,6 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
endpoint,
{
match conf.http_auth_type {
@@ -92,7 +83,9 @@ impl PageServerNode {
}
}
.as_deref(),
),
ssl_ca_cert,
)
.expect("Client constructs with no errors"),
}
}
@@ -149,10 +142,6 @@ impl PageServerNode {
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
overrides.push(format!("ssl_ca_file='{}'", ssl_ca_file.to_str().unwrap()));
}
// Apply the user-provided overrides
overrides.push({
let mut doc =
@@ -428,6 +417,11 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'l0_flush_delay_threshold' as an integer")?,
l0_flush_wait_upload: settings
.remove("l0_flush_wait_upload")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'l0_flush_wait_upload' as a boolean")?,
l0_flush_stall_threshold: settings
.remove("l0_flush_stall_threshold")
.map(|x| x.parse::<usize>())

View File

@@ -1,5 +1,6 @@
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
@@ -17,7 +18,7 @@ use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, Timelin
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::{Certificate, Method};
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -37,9 +38,9 @@ pub struct StorageController {
client: reqwest::Client,
config: NeonStorageControllerConf,
// The listen port is learned when starting the storage controller,
// The listen addresses is learned when starting the storage controller,
// hence the use of OnceLock to init it at the right time.
listen_port: OnceLock<u16>,
listen: OnceLock<SocketAddr>,
}
const COMMAND: &str = "storage_controller";
@@ -143,26 +144,15 @@ impl StorageController {
}
};
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("HTTP client should construct with no error");
Self {
env: env.clone(),
private_key,
public_key,
client: http_client,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
config: env.storage_controller.clone(),
listen_port: OnceLock::default(),
listen: OnceLock::default(),
}
}
@@ -347,34 +337,34 @@ impl StorageController {
}
}
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&instance_dir.join("server.crt"),
&instance_dir.join("server.key"),
)?;
}
let (listen, postgres_port) = {
if let Some(base_port) = start_args.base_port {
(
format!("127.0.0.1:{base_port}"),
self.config
.database_url
.expect("--base-port requires NeonStorageControllerConf::database_url")
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone();
let listen_url = &self.env.control_plane_api;
let listen = format!(
"{}:{}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
);
let scheme = listen_url.scheme();
let host = listen_url.host_str().unwrap();
let (listen_port, postgres_port) = if let Some(base_port) = start_args.base_port {
(
base_port,
self.config
.database_url
.expect("--base-port requires NeonStorageControllerConf::database_url")
.port(),
)
} else {
let port = listen_url.port().unwrap();
(port, port + 1)
(listen, listen_url.port().unwrap() + 1)
}
};
self.listen_port
.set(listen_port)
.expect("StorageController::listen_port is only set here");
let socket_addr = listen
.parse()
.expect("listen address is a valid socket address");
self.listen
.set(socket_addr)
.expect("StorageController::listen is only set here");
// Do we remove the pid file on stop?
let pg_started = self.is_postgres_running().await?;
@@ -510,15 +500,20 @@ impl StorageController {
drop(client);
conn.await??;
let addr = format!("{}:{}", host, listen_port);
let listen = self
.listen
.get()
.expect("cell is set earlier in this function");
let address_for_peers = Uri::builder()
.scheme(scheme)
.authority(addr.clone())
.scheme("http")
.authority(format!("{}:{}", listen.ip(), listen.port()))
.path_and_query("")
.build()
.unwrap();
let mut args = vec![
"-l",
&listen.to_string(),
"--dev",
"--database-url",
&database_url,
@@ -535,14 +530,6 @@ impl StorageController {
.map(|s| s.to_string())
.collect::<Vec<_>>();
match scheme {
"http" => args.extend(["--listen".to_string(), addr]),
"https" => args.extend(["--listen-https".to_string(), addr]),
_ => {
panic!("Unexpected url scheme in control_plane_api: {scheme}");
}
}
if self.config.start_as_candidate {
args.push("--start-as-candidate".to_string());
}
@@ -555,10 +542,6 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
@@ -587,20 +570,6 @@ impl StorageController {
args.push(format!("--split-threshold={split_threshold}"))
}
if let Some(max_split_shards) = self.config.max_split_shards.as_ref() {
args.push(format!("--max-split-shards={max_split_shards}"))
}
if let Some(initial_split_threshold) = self.config.initial_split_threshold.as_ref() {
args.push(format!(
"--initial-split-threshold={initial_split_threshold}"
))
}
if let Some(initial_split_shards) = self.config.initial_split_shards.as_ref() {
args.push(format!("--initial-split-shards={initial_split_shards}"))
}
if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
args.push(format!("--max-secondary-lag-bytes={lag}"))
}
@@ -621,8 +590,6 @@ impl StorageController {
args.push("--timelines-onto-safekeepers".to_string());
}
println!("Starting storage controller");
background_process::start_process(
COMMAND,
&instance_dir,
@@ -749,26 +716,30 @@ impl StorageController {
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
// to pass the readiness check. In this scenario [`Self::listen_port`] will
// be set (see [`Self::start`]).
// to pass the readiness check. In this scenario [`Self::listen`] will be set
// (see [`Self::start`]).
//
// Otherwise, we infer the storage controller api endpoint from the configured
// control plane API.
let port = if let Some(port) = self.listen_port.get() {
*port
let url = if let Some(socket_addr) = self.listen.get() {
Url::from_str(&format!(
"http://{}:{}/{path}",
socket_addr.ip().to_canonical(),
socket_addr.port()
))
.unwrap()
} else {
self.env.control_plane_api.port().unwrap()
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
))
.unwrap()
};
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let url = Url::from_str(&format!(
"{}://{}:{port}/{path}",
self.env.control_plane_api.scheme(),
self.env.control_plane_api.host_str().unwrap(),
))
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {
builder = builder.json(&body)

View File

@@ -20,7 +20,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Certificate, Method, StatusCode, Url};
use reqwest::{Method, StatusCode, Url};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -274,7 +274,7 @@ struct Cli {
jwt: Option<String>,
#[arg(long)]
/// Trusted root CA certificates to use in https APIs.
/// Trusted root CA certificate to use in https APIs.
ssl_ca_file: Option<PathBuf>,
#[command(subcommand)]
@@ -387,23 +387,17 @@ async fn main() -> anyhow::Result<()> {
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
let ssl_ca_certs = match &cli.ssl_ca_file {
let ssl_ca_cert = match &cli.ssl_ca_file {
Some(ssl_ca_file) => {
let buf = tokio::fs::read(ssl_ca_file).await?;
Certificate::from_pem_bundle(&buf)?
Some(reqwest::Certificate::from_pem(&buf)?)
}
None => Vec::new(),
None => None,
};
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;
let mut trimmed = cli.api.to_string();
trimmed.pop();
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
match cli.command {
Command::NodeRegister {

View File

@@ -1,196 +0,0 @@
## Summary
This is a retrospective RFC to document the design of the `storage-controller` service.
This service manages the physical mapping of Tenants and Timelines to Pageservers and Safekeepers. It
acts as the API for "storage" as an abstract concept: enabling other parts of the system to reason
about things like creating/deleting tenants and timelines without having to understand exactly which
pageserver and safekeeper to communicate, or any subtle rules about how to orchestrate these things.
The storage controller was implemented in the first half of 2024 as an essential part
of storage sharding, especially [shard splitting](032-shard-splitting.md).
It initially managed only pageservers, but has extended in 2025 to also manage safekeepers. In
some places you may seen unqualified references to 'nodes' -- those are pageservers.
## Design Choices
### Durability
We rely on an external postgres for all durable state. No local storage is used.
We avoid any unnecessary I/O to durable storage. For example:
- most tracking of in-flight changes to the system is done in-memory rather than recording progress/steps in a database
- When migrating tenant shards between pageservers we only touch the database to increment generation numbers,
we do not persist the total state of a tenant shard.
Being frugal with database I/O has two benefits:
- It avoids the database becoming a practical scaling bottleneck (we expect in-memory scale issues to be hit
before we hit e.g. transactions-per-second issues)
- It reduces cost when using a cloud database service to run the controller's postgres database.
The trade-off is that there is a "bootstrapping" problem: a controller can't be deployed in isolation, one
must first have some existing database system. In practice, we expect that Neon is deployed in one of the
following ways:
- into a cloud which has a postgres service that can be used to run the controller
- into a mature on-prem environment that has existing facilities for running databases
- into a test/dev environment where a simple one-node vanilla postgres installation is sufficient
### Consensus
The controller does _not_ implement any strong consensus mechanism of its own. Instead:
- Where strong consistency is required (for example, for pageserver generation numbers), this
responsibility is delegated to a transaction in our postgres database.
- Highly available deploys are done using a simple in-database record of what controller instances
are available, distinguished by timestamps, rather than having controllers directly negotiate a leader.
Avoiding strong consensus among controller processes is a cost saving (we avoid running three controllers
all the time), and simplifies implementation (we do not have to phrase all configuration changes as e.g raft
transactions).
The trade-off is that under some circumstances a controller with partial network isolation can cause availability
issues in the cluster, by making changes to pageserver state that might disagree with what the "true" active
controller is trying to do. The impact of this is bounded by our `controllers` database table, that enables
a rogue node to eventually realise that it is not the leader and step down. If a rogue node can't reach
the database, then it implicitly stops making progress. A rogue controller cannot durably damage the system
because pageserver data and safekeeper configs are protected by generation numbers that are only updated
via postgres transactions (i.e. no controller "trusts itself" to independently make decisions about generations).
### Scale
We design for high but not unlimited scale. The memory footprint of each tenant shard is small (~8kB), so
it is realistic to scale up to a million attached shards on a server with modest resources. Tenants in
a detached state (i.e. not active on pageservers) do not need to be managed by storage controller, and can
be relegated from memory to the database.
Typically, a tenant shard is updated about once a week, when we do a deploy. During deploys, we relocate
a few thousand tenants from each pageserver while it is restarted, so it is extremely rare for the controller
to have to do O(N) work (on all shards at once).
There are places where we do O(N) work:
- On normal startup, when loading from the database into memory
- On unclean startup (with no handover of observed state from a previous controller), where we will
scan all shards on all pageservers.
It is important that these locations are written efficiently. At high scale we should still expect runtimes
of the order tens of seconds to complete a storage controller start.
When the practical scale limit of a single storage controller is reached, just deploy another one with its
own pageservers & safekeepers: each controller+its storage servers should be thought of as a logical cluster
or "cell" of storage.
# High Level Design
The storage controller is an in-memory system (i.e. state for all attached
tenants is held in memory _as well as_ being represented in durable postgres storage).
## Infrastructure
The storage controller is an async rust binary using tokio.
The storage controller is built around the `Service` type. This implements
all the entry points for the outside world's interaction with the controller (HTTP handlers are mostly thin wrappers of service functions),
and holds most in-memory state (e.g. the list of tenant shards).
The state is held in a `ServiceInner` wrapped in a RwLock. This monolithic
lock is used to simplify reasoning about code that mutates state: each function that takes a write lock may be thought of as a serializable transaction on the in-memory state. This lock is clearly a bottleneck, but
nevertheless is scalable to managing millions of tenants.
Persistent state is held in a postgres database, and we use the `diesel` crate to provide database client functionality. All database access is wrapped in the `Persistence` type -- this makes it easy to understand which
code is touching the database. The database is only used when necessary, i.e. for state that cannot be recovered another way. For example, we do not store the secondary pageserver locations of tenant shards in the database, rather we learn these at startup from running pageservers, and/or make scheduling decisions to fill in the gaps. This adds some complexity, but massively reduces the load on the database, and enables running the storage controller with a very cheap postgres instance.
## Pageserver tenant scheduling & reconciliation
### Intent & observed state
Each tenant shard is represented by type `TenantShard`, which has an 'intent' and 'observed' state. Setting the
intent state is called _scheduling_, and doing remote I/O to make observed
state match intent state is called _reconciliation_.
The `Scheduler` type is responsible for making choices about the intent
state, such as choosing a pageserver for a new tenant shard, or assigning
a replacement pageserver when the original one fails.
The observed state is updated after tenant reconciliation (see below), and
has the concept of a `None` state for a pageserver, indicating unknown state. This is used to ensure that we can safely clean up after we start
but do not finish a remote call to a pageserver, or if a pageserver restarts and we are uncertain of its state.
### Tenant Reconciliation
The `Reconciler` type is responsible for updating pageservers to achieve
the intent state. It is instantiated when `Service` determines that a shard requires reconciliation, and owned by a background tokio task that
runs it to completion. Reconciler does not have access to the `Service` state: it is populated with a snapshot of relevant information when constructed, and submits is results to a channel that `Service` consumes
to update the tenant shard's observed state.
The Reconciler does have access to the database, but only uses it for
a single purpose: updating shards' generation numbers immediately before
attaching them to a pageserver.
Operations that change a tenant's scheduling will spawn a reconciler if
necessary, and there is also a background loop which checks every shard
for the need to reconcile -- this background loop ensures eventual progress
if some earlier reconciliations failed for some reason.
The reconciler has a general purpose code path which will attach/detach from pageservers as necessary, and a special case path for live migrations. The live migration case is more common in practice, and is taken whenever the current observed state indicates that we have a healthy attached location to migrate from. This implements live migration as described in the earlier [live migration RFC](028-pageserver-migration.md).
### Scheduling optimisation
During the periodic background reconciliation loop, the controller also
performance _scheduling optimization_. This is the process of looking for
shards that are in sub-optimal locations, and moving them.
Typically, this means:
- Shards attached outside their preferred AZ (e.g. after a node failure), to migrate them back to their preferred AZ
- Shards attached on the same pageserver as some other shards in the same
tenant, to migrate them elsewhere (e.g. after a shard split)
Scheduling optimisation is a multi-step process to ensure graceful cutovers, e.g. by creating new secondary location, waiting for it to
warm up, then cutting over. This is not done as an explicit queue
of operations, but rather by iteratively calling the optimisation
function, which will recognise each intervening state as something
that can generate the next optimisation.
### Pageserver heartbeats and failure
The `Heartbeater` type is responsible for detecting when a pageserver
becomes unavailable. This is fed back into `Service` for action: when
a pageserver is marked unavailable, tenant shards on that pageserver are
rescheduled and Reconcilers are spawned to cut them over to their new location.
## Pageserver timeline CRUD operations
By CRUD operations, we mean creating and deleting timelines. The authoritative storage for which timelines exist on the pageserver
is in S3, and is governed by the pageserver's system of generation
numbers. Because a shard can be attached to multiple pageservers
concurrently, we need to handle this when doing timeline CRUD operations:
- A timeline operation is only persistent if _after_ the ack from a pageserver, that pageserver's generation is still the latest.
- For deletions in particular, they are only persistent if _all_ attached
locations have acked the deletion operation, since if only the latest one
has acked then the timeline could still return from the dead if some old-generation attachment writes an index for it.
## Zero-downtime controller deployments
When two storage controllers run at the same time, they coordinate via
the database to establish one leader, and the other controller may proxy
requests to this leader
See [Storage controller restarts RFC](037-storage-controller-restarts.md).
Note that this is not a strong consensus mechanism: the controller must also survive split-brain situations. This is respected by code that
e.g. increments version numbers, which uses database transactions that
check the expected value before modifying it. A split-brain situation can
impact availability (e.g. if two controllers are fighting over where to
attach a shard), but it should never impact durability and data integrity.
## Graceful drain & fill of pageservers during deploys
The storage controller has functionality for draining + filling pageservers
while deploying new pageserver binaries, so that clients are not actively
using a pageserver while it restarts.
See [Graceful restarts RFC](033-storage-controller-drain-and-fill.md)
## Safekeeper timeline scheduling
This is currently under development, see [Safekeeper dynamic membership change RFC](035-safekeeper-dynamic-membership-change.md).

View File

@@ -275,18 +275,6 @@ pub enum ComputeMode {
Replica,
}
impl ComputeMode {
/// Convert the compute mode to a string that can be used to identify the type of compute,
/// which means that if it's a static compute, the LSN will not be included.
pub fn to_type_str(&self) -> &'static str {
match self {
ComputeMode::Primary => "primary",
ComputeMode::Static(_) => "static",
ComputeMode::Replica => "replica",
}
}
}
/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled

View File

@@ -6,7 +6,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
camino.workspace = true
fail.workspace = true
@@ -19,15 +18,14 @@ pprof.workspace = true
regex.workspace = true
routerify.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_path_to_error.workspace = true
serde.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true
uuid.workspace = true

View File

@@ -30,20 +30,6 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("failed to define a metric")
});
static SERVE_METRICS_COUNT_2: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_2_total",
"Number of metric requests made"
)
.expect("failed to define a metric")
});
static SERVE_METRICS_COUNT_3: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total_3",
"Number of metric requests made"
)
.expect("failed to define a metric")
});
static X_REQUEST_ID_HEADER_STR: &str = "x-request-id";
@@ -265,8 +251,6 @@ impl std::io::Write for ChannelWriter {
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();
SERVE_METRICS_COUNT_2.inc();
SERVE_METRICS_COUNT_3.inc();
let started_at = std::time::Instant::now();

View File

@@ -1,124 +1,21 @@
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
server::{ClientHello, ResolvesServerCert},
sign::CertifiedKey,
};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let cert_data = tokio::fs::read(filename)
.await
.context(format!("failed reading certificate file {filename:?}"))?;
let mut reader = std::io::Cursor::new(&cert_data);
pub fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let file = std::fs::File::open(filename)?;
let mut reader = std::io::BufReader::new(file);
let cert_chain = rustls_pemfile::certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.context(format!("failed parsing certificate from file {filename:?}"))?;
Ok(cert_chain)
Ok(rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?)
}
pub async fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
let key_data = tokio::fs::read(filename)
.await
.context(format!("failed reading private key file {filename:?}"))?;
let mut reader = std::io::Cursor::new(&key_data);
pub fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
let file = std::fs::File::open(filename)?;
let mut reader = std::io::BufReader::new(file);
let key = rustls_pemfile::private_key(&mut reader)
.context(format!("failed parsing private key from file {filename:?}"))?;
let key = rustls_pemfile::private_key(&mut reader)?;
key.ok_or(anyhow::anyhow!(
"no private key found in {}",
filename.as_str(),
))
}
pub async fn load_certified_key(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
) -> anyhow::Result<CertifiedKey> {
let cert_chain = load_cert_chain(cert_filename).await?;
let key = load_private_key(key_filename).await?;
let key = rustls::crypto::ring::default_provider()
.key_provider
.load_private_key(key)?;
let certified_key = CertifiedKey::new(cert_chain, key);
certified_key.keys_match()?;
Ok(certified_key)
}
/// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from
/// the disk periodically.
#[derive(Debug)]
pub struct ReloadingCertificateResolver {
certified_key: ArcSwap<CertifiedKey>,
}
impl ReloadingCertificateResolver {
/// Creates a new Resolver by loading certificate and private key from FS and
/// creating tokio::task to reload them with provided reload_period.
pub async fn new(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
reload_period: Duration,
) -> anyhow::Result<Arc<Self>> {
let this = Arc::new(Self {
certified_key: ArcSwap::from_pointee(
load_certified_key(key_filename, cert_filename).await?,
),
});
tokio::spawn({
let weak_this = Arc::downgrade(&this);
let key_filename = key_filename.to_owned();
let cert_filename = cert_filename.to_owned();
async move {
let start = tokio::time::Instant::now() + reload_period;
let mut interval = tokio::time::interval_at(start, reload_period);
let mut last_reload_failed = false;
loop {
interval.tick().await;
let this = match weak_this.upgrade() {
Some(this) => this,
None => break, // Resolver has been destroyed, exit.
};
match load_certified_key(&key_filename, &cert_filename).await {
Ok(new_certified_key) => {
if new_certified_key.cert == this.certified_key.load().cert {
tracing::debug!("Certificate has not changed since last reloading");
} else {
tracing::info!("Certificate has been reloaded");
this.certified_key.store(Arc::new(new_certified_key));
}
last_reload_failed = false;
}
Err(err) => {
// Note: Reloading certs may fail if it conflicts with the script updating
// the files at the same time. Warn only if the error is persistent.
if last_reload_failed {
tracing::warn!("Error reloading certificate: {err:#}");
} else {
tracing::info!("Error reloading certificate: {err:#}");
}
last_reload_failed = true;
}
}
}
}
});
Ok(this)
}
}
impl ResolvesServerCert for ReloadingCertificateResolver {
fn resolve(&self, _client_hello: ClientHello<'_>) -> Option<Arc<CertifiedKey>> {
Some(self.certified_key.load_full())
}
}

View File

@@ -61,9 +61,6 @@ pub struct ConfigToml {
pub listen_https_addr: Option<String>,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")]
pub ssl_cert_reload_period: Duration,
pub ssl_ca_file: Option<Utf8PathBuf>,
pub availability_zone: Option<String>,
#[serde(with = "humantime_serde")]
pub wait_lsn_timeout: Duration,
@@ -285,6 +282,12 @@ pub struct TenantConfigToml {
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
/// to avoid deadlock. 0 to disable. Disabled by default.
pub l0_flush_stall_threshold: Option<usize>,
/// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next
/// layer. This is a temporary backpressure mechanism which should be removed once
/// l0_flush_{delay,stall}_threshold is fully enabled.
///
/// TODO: this is no longer enabled, remove it when the config option is no longer set.
pub l0_flush_wait_upload: bool,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
@@ -436,8 +439,6 @@ impl Default for ConfigToml {
listen_https_addr: (None),
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_file: None,
availability_zone: (None),
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
@@ -573,6 +574,8 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = false;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -619,6 +622,7 @@ impl Default for TenantConfigToml {
compaction_l0_semaphore: DEFAULT_COMPACTION_L0_SEMAPHORE,
l0_flush_delay_threshold: None,
l0_flush_stall_threshold: None,
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),

View File

@@ -523,6 +523,8 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_stall_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_wait_upload: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_horizon: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_period: FieldPatch<String>,
@@ -612,6 +614,9 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_stall_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_wait_upload: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_horizon: Option<u64>,
@@ -707,6 +712,7 @@ impl TenantConfig {
mut compaction_l0_semaphore,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
@@ -759,6 +765,7 @@ impl TenantConfig {
patch
.l0_flush_stall_threshold
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch
.gc_period
@@ -837,6 +844,7 @@ impl TenantConfig {
compaction_l0_semaphore,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
gc_horizon,
gc_period,
image_creation_threshold,
@@ -903,6 +911,9 @@ impl TenantConfig {
l0_flush_stall_threshold: self
.l0_flush_stall_threshold
.or(global_conf.l0_flush_stall_threshold),
l0_flush_wait_upload: self
.l0_flush_wait_upload
.unwrap_or(global_conf.l0_flush_wait_upload),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
image_creation_threshold: self
@@ -1353,12 +1364,6 @@ pub enum TimelineArchivalState {
Unarchived,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum TimelineVisibilityState {
Visible,
Invisible,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct TimelineArchivalConfigRequest {
pub state: TimelineArchivalState,
@@ -1491,9 +1496,6 @@ pub struct TimelineInfo {
/// The status of the rel_size migration.
pub rel_size_migration: Option<RelSizeMigration>,
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -8,9 +8,10 @@ license = "MIT/Apache-2.0"
bytes.workspace = true
fallible-iterator.workspace = true
futures-util = { workspace = true, features = ["sink"] }
tracing.workspace = true
log = "0.4"
parking_lot.workspace = true
pin-project-lite.workspace = true
phf = "0.11"
postgres-protocol2 = { path = "../postgres-protocol2" }
postgres-types2 = { path = "../postgres-types2" }
tokio = { workspace = true, features = ["io-util", "time", "net"] }

View File

@@ -6,13 +6,13 @@ use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use log::{info, trace};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tokio_util::sync::PollSender;
use tracing::{info, trace};
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,9 @@ use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;

View File

@@ -7,11 +7,11 @@ use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::{Level, debug, log_enabled};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
@@ -36,7 +36,7 @@ where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
let buf = if log_enabled!(Level::Debug) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",

View File

@@ -6,10 +6,10 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;

View File

@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
pub use reqwest::Body as ReqwestBody;
use reqwest::{IntoUrl, Method, StatusCode, Url};
use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -39,8 +39,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
#[error("request timed out: {0}")]
Timeout(String),
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -72,7 +72,24 @@ pub enum ForceAwaitLogicalSize {
}
impl Client {
pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
pub fn new(
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build().map_err(Error::CreateClient)?;
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),

View File

@@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -75,10 +75,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -123,10 +123,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -12,7 +12,6 @@ use std::time::Duration;
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
@@ -235,7 +234,6 @@ fn initialize_config(
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
@@ -429,7 +427,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
conf,
);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
@@ -623,15 +621,12 @@ fn start_pageserver(
let https_task = match https_listener {
Some(https_listener) => {
let resolver = MGMT_REQUEST_RUNTIME.block_on(ReloadingCertificateResolver::new(
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
))?;
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(resolver);
.with_single_cert(certs, key)?;
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));

View File

@@ -17,7 +17,7 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use remote_storage::{RemotePath, RemoteStorageConfig};
use reqwest::{Certificate, Url};
use reqwest::Url;
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
@@ -43,7 +43,7 @@ use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, virtual_file};
///
/// For fields that require additional validation or filling in of defaults at runtime,
/// check for examples in the [`PageServerConf::parse_and_validate`] method.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
@@ -56,17 +56,8 @@ pub struct PageServerConf {
/// Example: 127.0.0.1:9899
pub listen_https_addr: Option<String>,
/// Path to a file with certificate's private key for https API.
/// Default: server.key
pub ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
/// Default: server.crt
pub ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
/// Default: 60s.
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
pub ssl_ca_certs: Vec<Certificate>,
/// Current availability zone. Used for traffic metrics.
pub availability_zone: Option<String>,
@@ -334,8 +325,6 @@ impl PageServerConf {
listen_https_addr,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
ssl_ca_file,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
@@ -397,7 +386,6 @@ impl PageServerConf {
listen_https_addr,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
@@ -481,13 +469,6 @@ impl PageServerConf {
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
ssl_ca_certs: match ssl_ca_file {
Some(ssl_ca_file) => {
let buf = std::fs::read(ssl_ca_file)?;
Certificate::from_pem_bundle(&buf)?
}
None => Vec::new(),
},
};
// ------------------------------------------------------------

View File

@@ -50,13 +50,10 @@ pub trait StorageControllerUpcallApi {
impl StorageControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(
conf: &'static PageServerConf,
cancel: &CancellationToken,
) -> Result<Option<Self>, reqwest::Error> {
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return Ok(None),
None => return None,
};
if let Ok(mut segs) = url.path_segments_mut() {
@@ -76,16 +73,12 @@ impl StorageControllerUpcallClient {
client = client.default_headers(headers);
}
for ssl_ca_cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(ssl_ca_cert.clone());
}
Ok(Some(Self {
http_client: client.build()?,
Some(Self {
http_client: client.build().expect("Failed to construct HTTP client"),
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
}))
})
}
#[tracing::instrument(skip_all)]

View File

@@ -669,13 +669,6 @@ paths:
Detach a timeline from its ancestor and reparent all ancestors timelines with lower `ancestor_lsn`.
Current implementation might not be retryable across failure cases, but will be enhanced in future.
Detaching should be expected to be expensive operation. Timeouts should be retried.
parameters:
- name: detach_behavior
in: query
required: false
schema:
description: Currently valid values are `v1`, `v2`
type: string
responses:
"200":
description: |
@@ -1086,7 +1079,6 @@ components:
- last_record_lsn
- disk_consistent_lsn
- state
- min_readable_lsn
properties:
timeline_id:
type: string
@@ -1133,40 +1125,6 @@ components:
applied_gc_cutoff_lsn:
type: string
format: hex
safekeepers:
$ref: "#/components/schemas/TimelineSafekeepersInfo"
TimelineSafekeepersInfo:
type: object
required:
- tenant_id
- timeline_id
- generation
- safekeepers
properties:
tenant_id:
type: string
format: hex
timeline_id:
type: string
format: hex
generation:
type: integer
safekeepers:
type: array
items:
$ref: "#/components/schemas/TimelineSafekeeperInfo"
TimelineSafekeeperInfo:
type: object
required:
- id
- hostname
properties:
id:
type: integer
hostname:
type: string
SyntheticSizeResponse:
type: object

View File

@@ -37,8 +37,8 @@ use pageserver_api::models::{
TenantShardSplitResponse, TenantSorting, TenantState, TenantWaitLsnRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateRequestMode,
TimelineCreateRequestModeImportPgdata, TimelineGcRequest, TimelineInfo,
TimelinePatchIndexPartRequest, TimelineVisibilityState, TimelinesInfoAndOffloaded,
TopTenantShardItem, TopTenantShardsRequest, TopTenantShardsResponse,
TimelinePatchIndexPartRequest, TimelinesInfoAndOffloaded, TopTenantShardItem,
TopTenantShardsRequest, TopTenantShardsResponse,
};
use pageserver_api::shard::{ShardCount, TenantShardId};
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
@@ -439,7 +439,6 @@ async fn build_timeline_info_common(
let remote_consistent_lsn_visible = timeline
.get_remote_consistent_lsn_visible()
.unwrap_or(Lsn(0));
let is_invisible = timeline.remote_client.is_invisible().unwrap_or(false);
let walreceiver_status = timeline.walreceiver_status();
@@ -483,7 +482,6 @@ async fn build_timeline_info_common(
state,
is_archived: Some(is_archived),
rel_size_migration: Some(timeline.get_rel_size_v2_status()),
is_invisible: Some(is_invisible),
walreceiver_status,
};
@@ -2256,6 +2254,7 @@ async fn timeline_compact_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
@@ -2334,28 +2333,6 @@ async fn timeline_compact_handler(
.await
}
async fn timeline_mark_invisible_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let timeline = tenant.get_timeline(timeline_id, true)?;
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(TimelineVisibilityState::Invisible).map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_timeline_mark_invisible", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run offload immediately on given timeline.
async fn timeline_offload_handler(
request: Request<Body>,
@@ -2416,6 +2393,7 @@ async fn timeline_checkpoint_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
@@ -3772,10 +3750,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|r| api_handler( r, timeline_mark_invisible_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
|r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),

View File

@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
use futures::Future;
use metrics::{
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
@@ -499,6 +499,15 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2855,6 +2864,7 @@ pub(crate) struct TimelineMetrics {
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_delay_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
@@ -2906,6 +2916,9 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
@@ -3033,6 +3046,7 @@ impl TimelineMetrics {
timeline_id,
flush_time_histo,
flush_delay_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
@@ -3082,6 +3096,14 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}
/// Generates TIMELINE_LAYER labels for a persistent layer.
fn make_layer_labels(&self, layer_desc: &PersistentLayerDesc) -> [&str; 5] {
let level = match LayerMap::is_l0(&layer_desc.key_range, layer_desc.is_delta()) {
@@ -3185,6 +3207,7 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());

View File

@@ -237,7 +237,7 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[instrument(skip_all, fields(peer_addr, application_name))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
@@ -2512,58 +2512,6 @@ impl PageServiceCmd {
}
}
/// Parse the startup options from the postgres wire protocol startup packet.
///
/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
/// whether all options are successfully parsed. There could be duplicates in the options
/// if the caller passed such parameters.
fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
let mut parsing_config = false;
let mut has_error = false;
let mut config = Vec::new();
for item in options.split_whitespace() {
if item == "-c" {
if !parsing_config {
parsing_config = true;
} else {
// "-c" followed with another "-c"
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
} else if item.starts_with("-c") || parsing_config {
let Some((mut key, value)) = item.split_once('=') else {
// "-c" followed with an invalid option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
if !parsing_config {
// Parse "-coptions=X"
let Some(stripped_key) = key.strip_prefix("-c") else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
key = stripped_key;
}
config.push((key.to_string(), value.to_string()));
parsing_config = false;
} else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
}
if parsing_config {
// "-c" without the option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
}
(config, has_error)
}
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
@@ -2608,14 +2556,6 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
Span::current().record("compute_mode", field::display(value));
}
}
}
};
Ok(())
@@ -2729,7 +2669,6 @@ where
PageServiceCmd::Set => {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
// TODO: allow setting options, i.e., application_name/compute_mode via SET commands
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
@@ -3004,46 +2943,4 @@ mod tests {
let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
assert!(cmd.is_err());
}
#[test]
fn test_parse_options() {
let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
assert!(!has_error);
assert_eq!(
config,
vec![("neon.compute_mode".to_string(), "primary".to_string())]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (_, has_error) = parse_options("-c");
assert!(has_error);
let (_, has_error) = parse_options("-c foo=bar -c -c");
assert!(has_error);
let (_, has_error) = parse_options(" ");
assert!(!has_error);
let (_, has_error) = parse_options(" -c neon.compute_mode");
assert!(has_error);
}
}

View File

@@ -38,7 +38,6 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -585,25 +584,18 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
info!("task {} completed", task.name);
}
} else {

View File

@@ -3080,7 +3080,6 @@ impl Tenant {
let mut has_pending_l0 = false;
for timeline in compact_l0 {
let ctx = &ctx.with_scope_timeline(&timeline);
// NB: don't set CompactFlags::YieldForL0, since this is an L0-only compaction pass.
let outcome = timeline
.compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
@@ -3098,9 +3097,14 @@ impl Tenant {
}
}
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated more
// L0 layers, they may also be compacted here. Image compaction will yield if there is
// pending L0 compaction on any tenant timeline.
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated
// more L0 layers, they may also be compacted here.
//
// NB: image compaction may yield if there is pending L0 compaction.
//
// TODO: it will only yield if there is pending L0 compaction on the same timeline. If a
// different timeline needs compaction, it won't. It should check `l0_compaction_trigger`.
// We leave this for a later PR.
//
// TODO: consider ordering timelines by some priority, e.g. time since last full compaction,
// amount of L1 delta debt or garbage, offload-eligible timelines first, etc.
@@ -3111,14 +3115,8 @@ impl Tenant {
}
let ctx = &ctx.with_scope_timeline(&timeline);
// Yield for L0 if the separate L0 pass is enabled (otherwise there's no point).
let mut flags = EnumSet::default();
if self.get_compaction_l0_first() {
flags |= CompactFlags::YieldForL0;
}
let mut outcome = timeline
.compact(cancel, flags, ctx)
.compact(cancel, EnumSet::default(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
.await
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
@@ -6518,7 +6516,11 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.await?;
let mut writer = tline.writer().await;
@@ -6535,7 +6537,11 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.await?;
let mut writer = tline.writer().await;
@@ -6552,7 +6558,11 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.await?;
let mut writer = tline.writer().await;
@@ -6569,7 +6579,11 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.await?;
assert_eq!(
@@ -6652,7 +6666,9 @@ mod tests {
timeline.freeze_and_flush().await?;
if compact {
// this requires timeline to be &Arc<Timeline>
timeline.compact(&cancel, EnumSet::default(), ctx).await?;
timeline
.compact(&cancel, CompactFlags::NoYield.into(), ctx)
.await?;
}
// this doesn't really need to use the timeline_id target, but it is closer to what it
@@ -6979,6 +6995,7 @@ mod tests {
child_timeline.freeze_and_flush().await?;
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
child_timeline
.compact(&CancellationToken::new(), flags, &ctx)
.await?;
@@ -7357,7 +7374,9 @@ mod tests {
// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
@@ -7686,6 +7705,7 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
} else {
EnumSet::empty()
@@ -7736,7 +7756,9 @@ mod tests {
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
@@ -7901,6 +7923,7 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8363,6 +8386,7 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8430,6 +8454,7 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -11526,255 +11551,4 @@ mod tests {
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
use pageserver_api::models::TimelineVisibilityState;
use crate::tenant::size::gather_inputs;
let tenant_conf = pageserver_api::models::TenantConfig {
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
pitr_interval: Some(Duration::ZERO),
..Default::default()
};
let harness = TenantHarness::create_custom(
"test_synthetic_size_calculation_with_invisible_branches",
tenant_conf,
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)
.await?;
let (tenant, ctx) = harness.load().await;
let main_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![],
vec![],
vec![],
Lsn(0x100),
)
.await?;
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot1,
Some(Lsn(0x20)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot2,
Some(Lsn(0x30)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot3,
Some(Lsn(0x40)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let limit = Arc::new(Semaphore::new(1));
let max_retention_period = None;
let mut logical_size_cache = HashMap::new();
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
let cancel = CancellationToken::new();
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
use LsnKind::*;
use tenant_size_model::Segment;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: GcCutOff,
}, // we need to retain everything above the last branch point
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
main_tline
.remote_client
.schedule_index_upload_for_timeline_invisible_state(
TimelineVisibilityState::Invisible,
)?;
main_tline.remote_client.wait_completion().await?;
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40, // Branch end LSN == last branch point LSN
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
Ok(())
}
}

View File

@@ -344,7 +344,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel) {
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {

View File

@@ -194,7 +194,7 @@ pub(crate) use download::{
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
use remote_storage::{
@@ -573,16 +573,6 @@ impl RemoteTimelineClient {
.ok()
}
/// Returns true if the timeline is invisible in synthetic size calculations.
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.upload_queue
.lock()
.unwrap()
.initialized_mut()
.map(|q| q.clean.0.marked_invisible_at.is_some())
.ok()
}
/// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
///
/// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
@@ -855,37 +845,6 @@ impl RemoteTimelineClient {
Ok(need_wait)
}
pub(crate) fn schedule_index_upload_for_timeline_invisible_state(
self: &Arc<Self>,
state: TimelineVisibilityState,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
fn need_change(
marked_invisible_at: &Option<NaiveDateTime>,
state: TimelineVisibilityState,
) -> Option<bool> {
match (marked_invisible_at, state) {
(Some(_), TimelineVisibilityState::Invisible) => Some(false),
(None, TimelineVisibilityState::Invisible) => Some(true),
(Some(_), TimelineVisibilityState::Visible) => Some(false),
(None, TimelineVisibilityState::Visible) => Some(true),
}
}
let need_upload_scheduled = need_change(&upload_queue.dirty.marked_invisible_at, state);
if let Some(marked_invisible_at_set) = need_upload_scheduled {
let intended_marked_invisible_at =
marked_invisible_at_set.then(|| Utc::now().naive_utc());
upload_queue.dirty.marked_invisible_at = intended_marked_invisible_at;
self.schedule_index_upload(upload_queue);
}
Ok(())
}
/// Shuts the timeline client down, but only if the timeline is archived.
///
/// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
@@ -1968,7 +1927,9 @@ impl RemoteTimelineClient {
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The number of inprogress tasks is limited by `Self::inprogress_tasks`, see `next_ready`.
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");
@@ -2216,11 +2177,6 @@ impl RemoteTimelineClient {
}
res
}
// TODO: this should wait for the deletion to be executed by the deletion queue.
// Otherwise, the deletion may race with an upload and wrongfully delete a newer
// file. Some of the above logic attempts to work around this, it should be replaced
// by the upload queue ordering guarantees (see `can_bypass`). See:
// <https://github.com/neondatabase/neon/issues/10283>.
UploadOp::Delete(delete) => {
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();

View File

@@ -110,10 +110,6 @@ pub struct IndexPart {
/// just the specific use case here; it needs a new name.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) gc_compaction: Option<GcCompactionState>,
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -141,11 +137,10 @@ impl IndexPart {
/// - 11: +rel_size_migration
/// - 12: +l2_lsn
/// - 13: +gc_compaction
/// - 14: +marked_invisible_at
const LATEST_VERSION: usize = 14;
const LATEST_VERSION: usize = 13;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -164,7 +159,6 @@ impl IndexPart {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
}
}
@@ -474,7 +468,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -523,7 +516,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -573,7 +565,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -626,7 +617,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -674,7 +664,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -725,7 +714,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -781,7 +769,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -842,7 +829,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -904,7 +890,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -971,7 +956,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1051,7 +1035,6 @@ mod tests {
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1132,7 +1115,6 @@ mod tests {
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1142,7 +1124,7 @@ mod tests {
#[test]
fn v12_v13_l2_gc_ompaction_is_parsed() {
let example = r#"{
"version": 13,
"version": 12,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
@@ -1178,7 +1160,7 @@ mod tests {
}"#;
let expected = IndexPart {
version: 13,
version: 12,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
@@ -1219,95 +1201,6 @@ mod tests {
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v14_marked_invisible_at_is_parsed() {
let example = r#"{
"version": 14,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123"
}"#;
let expected = IndexPart {
version: 14,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -33,7 +33,7 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
pub timeline_id: TimelineId,
@@ -248,8 +248,6 @@ pub(super) async fn gather_inputs(
None
};
let branch_is_invisible = timeline.is_invisible() == Some(true);
let lease_points = gc_info
.leases
.keys()
@@ -273,10 +271,7 @@ pub(super) async fn gather_inputs(
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
if !branch_is_invisible {
// Do not count lease points for invisible branches.
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
}
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
drop(gc_info);
@@ -292,9 +287,7 @@ pub(super) async fn gather_inputs(
// Add a point for the PITR cutoff
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
if !branch_start_needed && !branch_is_invisible {
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
// range from the last branch point to the latest data.
if !branch_start_needed {
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
}
@@ -380,19 +373,11 @@ pub(super) async fn gather_inputs(
}
}
let branch_end_lsn = if branch_is_invisible {
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
segments.last().unwrap().segment.lsn
} else {
// Otherwise, the branch end is the last record LSN.
last_record_lsn.0
};
// Current end of the timeline
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: branch_end_lsn,
lsn: last_record_lsn.0,
size: None, // Filled in later, if necessary
needed: true,
},
@@ -624,7 +609,6 @@ async fn calculate_logical_size(
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[cfg(test)]
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
@@ -782,7 +766,6 @@ fn verify_size_for_multiple_branches() {
assert_eq!(inputs.calculate(), 37_851_408);
}
#[cfg(test)]
#[test]
fn verify_size_for_one_branch() {
let doc = r#"

View File

@@ -84,8 +84,8 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::remote_timeline_client::index::{GcCompactionState, IndexPart};
use super::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use super::secondary::heatmap::HeatMapLayer;
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
@@ -870,14 +870,9 @@ pub(crate) enum CompactFlags {
OnlyL0Compaction,
EnhancedGcBottomMostCompaction,
DryRun,
/// Makes image compaction yield if there's pending L0 compaction. This should always be used in
/// the background compaction task, since we want to aggressively compact down L0 to bound
/// read amplification.
///
/// It only makes sense to use this when `compaction_l0_first` is enabled (such that we yield to
/// an L0 compaction pass), and without `OnlyL0Compaction` (L0 compaction shouldn't yield for L0
/// compaction).
YieldForL0,
/// Disables compaction yielding e.g. due to high L0 count. This is set e.g. when requesting
/// compaction via HTTP API.
NoYield,
}
#[serde_with::serde_as]
@@ -1896,19 +1891,18 @@ impl Timeline {
// out by other background tasks (including image compaction). We request this via
// `BackgroundLoopKind::L0Compaction`.
//
// Yield for pending L0 compaction while waiting for the semaphore.
// If this is a regular compaction pass, and L0-only compaction is enabled in the config,
// then we should yield for immediate L0 compaction if necessary while we're waiting for the
// background task semaphore. There's no point yielding otherwise, since we'd just end up
// right back here.
let is_l0_only = options.flags.contains(CompactFlags::OnlyL0Compaction);
let semaphore_kind = match is_l0_only && self.get_compaction_l0_semaphore() {
true => BackgroundLoopKind::L0Compaction,
false => BackgroundLoopKind::Compaction,
};
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
if yield_for_l0 {
// If this is an L0 pass, it doesn't make sense to yield for L0.
debug_assert!(!is_l0_only, "YieldForL0 during L0 pass");
// If `compaction_l0_first` is disabled, there's no point yielding.
debug_assert!(self.get_compaction_l0_first(), "YieldForL0 without L0 pass");
}
let yield_for_l0 = !is_l0_only
&& self.get_compaction_l0_first()
&& !options.flags.contains(CompactFlags::NoYield);
let acquire = async move {
let guard = self.compaction_lock.lock().await;
@@ -2215,10 +2209,6 @@ impl Timeline {
self.remote_client.is_archived()
}
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.remote_client.is_invisible()
}
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
@@ -2572,6 +2562,14 @@ impl Timeline {
Some(max(l0_flush_stall_threshold, compaction_threshold))
}
fn get_l0_flush_wait_upload(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.l0_flush_wait_upload
.unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -4593,6 +4591,27 @@ impl Timeline {
// release lock on 'layers'
};
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
// TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead.
if self.get_l0_flush_wait_upload() {
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);
}
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this

View File

@@ -394,8 +394,8 @@ impl GcCompactionQueue {
if job.dry_run {
flags |= CompactFlags::DryRun;
}
if options.flags.contains(CompactFlags::YieldForL0) {
flags |= CompactFlags::YieldForL0;
if options.flags.contains(CompactFlags::NoYield) {
flags |= CompactFlags::NoYield;
}
let options = CompactOptions {
flags,
@@ -983,7 +983,7 @@ impl Timeline {
// Yield if we have pending L0 compaction. The scheduler will do another pass.
if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
&& options.flags.contains(CompactFlags::YieldForL0)
&& !options.flags.contains(CompactFlags::NoYield)
{
info!("image/ancestor compaction yielding for L0 compaction");
return Ok(CompactionOutcome::YieldForL0);
@@ -1028,7 +1028,7 @@ impl Timeline {
.load()
.as_ref()
.clone(),
options.flags.contains(CompactFlags::YieldForL0),
!options.flags.contains(CompactFlags::NoYield),
)
.await
.inspect_err(|err| {
@@ -2635,7 +2635,7 @@ impl Timeline {
) -> Result<CompactionOutcome, CompactionError> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options.clone());
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
let no_yield = options.flags.contains(CompactFlags::NoYield);
if sub_compaction {
info!(
"running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
@@ -2650,7 +2650,7 @@ impl Timeline {
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
self.compact_with_gc_inner(cancel, job, ctx, no_yield)
.await?;
}
if jobs_len == 0 {
@@ -2658,8 +2658,7 @@ impl Timeline {
}
return Ok(CompactionOutcome::Done);
}
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.await
self.compact_with_gc_inner(cancel, job, ctx, no_yield).await
}
async fn compact_with_gc_inner(
@@ -2667,7 +2666,7 @@ impl Timeline {
cancel: &CancellationToken,
job: GcCompactJob,
ctx: &RequestContext,
yield_for_l0: bool,
no_yield: bool,
) -> Result<CompactionOutcome, CompactionError> {
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
@@ -2937,15 +2936,18 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
let should_yield = yield_for_l0
&& self
if !no_yield {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
if should_yield {
tracing::info!(
"preempt gc-compaction when downloading layers: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
let resident_layer = layer
.download_and_keep_resident(ctx)
@@ -3079,17 +3081,21 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
if !no_yield {
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction in the main loop: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
}
if self.shard_identity.is_key_disposable(&key) {
// If this shard does not need to store this key, simply skip it.

View File

@@ -235,7 +235,7 @@ pub(super) async fn prepare(
return Err(NoAncestor);
}
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
// If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
@@ -249,13 +249,7 @@ pub(super) async fn prepare(
ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
ancestor = ancestor_of_ancestor;
// TODO: do we still need to check if we don't want to reparent?
check_no_archived_children_of_ancestor(
tenant,
detached,
&ancestor,
ancestor_lsn,
behavior,
)?;
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
}
} else if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
@@ -1162,44 +1156,31 @@ fn check_no_archived_children_of_ancestor(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,
detach_behavior: DetachBehavior,
) -> Result<(), Error> {
match detach_behavior {
DetachBehavior::NoAncestorAndReparent => {
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in
reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
{
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
}
}
DetachBehavior::MultiLevelAndNoReparent => {
// We don't need to check anything if the user requested to not reparent.
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
}
Ok(())
}

View File

@@ -647,25 +647,18 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return found;
}
#if PG_MAJORVERSION_NUM >= 16
static PGIOAlignedBlock voidblock = {0};
#else
static PGAlignedBlock voidblock = {0};
#endif
#define SCRIBBLEPAGE (&voidblock.data)
/*
* Try to read pages from local cache.
* Returns the number of pages read from the local cache, and sets bits in
* 'mask' for the pages which were read. This may scribble over buffers not
* marked in 'mask', so be careful with operation ordering.
* 'read' for the pages which were read. This may scribble over buffers not
* marked in 'read', so be careful with operation ordering.
*
* In case of error local file cache is disabled (lfc->limit is set to zero),
* and -1 is returned.
* and -1 is returned. Note that 'read' and the buffers may be touched and in
* an otherwise invalid state.
*
* If the mask argument is supplied, we'll only try to read those pages which
* don't have their bits set on entry. At exit, pages which were successfully
* read from LFC will have their bits set.
* If the mask argument is supplied, bits will be set at the offsets of pages
* that were present and read from the LFC.
*/
int
lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -700,43 +693,23 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
int n_blocks_to_read = 0;
int iov_last_used = 0;
int first_block_in_chunk_read = -1;
int n_blocks_to_read = 0;
ConditionVariable* cv;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
{
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
/* mask not set = we must do work */
if (!BITMAP_ISSET(mask, buf_offset + i))
{
iov[i].iov_base = buffers[buf_offset + i];
n_blocks_to_read++;
iov_last_used = i + 1;
if (first_block_in_chunk_read == -1)
{
first_block_in_chunk_read = i;
}
}
/* mask set = we must do no work */
else
{
/* don't scribble on pages we weren't requested to write to */
iov[i].iov_base = SCRIBBLEPAGE;
}
BITMAP_CLR(mask, buf_offset + i);
}
/* shortcut IO */
if (n_blocks_to_read == 0)
{
buf_offset += blocks_in_chunk;
@@ -745,12 +718,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
continue;
}
/*
* The effective iov size must be >= the number of blocks we're about
* to read.
*/
Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -795,15 +762,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
generation = lfc_ctl->generation;
entry_offset = entry->offset;
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
for (int i = 0; i < blocks_in_chunk; i++)
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
/* no need to work on something we're not interested in */
if (BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -827,7 +789,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(chunk_mask, i);
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
@@ -839,34 +801,16 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
/* Read only the blocks we're interested in, limiting */
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
nwrite, first_read_offset * BLCKSZ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * nwrite))
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
/* Place entry to the head of LRU list */
@@ -1069,9 +1013,6 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1222,13 +1163,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
/*

View File

@@ -15,7 +15,6 @@
#include "postgres.h"
#include <math.h>
#include <sys/socket.h>
#include "libpq-int.h"
@@ -51,20 +50,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
CP_MODE_STATIC
};
static const struct config_enum_entry neon_compute_modes[] = {
{"primary", CP_MODE_PRIMARY, false},
{"replica", CP_MODE_REPLICA, false},
{"static", CP_MODE_STATIC, false},
{NULL, 0, false}
};
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -77,13 +62,11 @@ int flush_every_n_requests = 8;
int neon_protocol_version = 2;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
static int pageserver_response_disconnect_timeout = 150000;
static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */
typedef struct
{
@@ -407,10 +390,9 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[5];
const char *values[5];
char pid_str[16] = { 0 };
char endpoint_str[36] = { 0 };
const char *keywords[4];
const char *values[4];
char pid_str[16];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -482,31 +464,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
n_pgsql_params++;
}
{
bool param_set = false;
switch (neon_compute_mode)
{
case CP_MODE_PRIMARY:
strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_REPLICA:
strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_STATIC:
strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str));
param_set = true;
break;
}
if (param_set)
{
keywords[n_pgsql_params] = "options";
values[n_pgsql_params] = endpoint_str;
n_pgsql_params++;
}
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
@@ -765,24 +722,6 @@ get_socket_stats(int socketfd, int *sndbuf, int *recvbuf)
#endif
}
/*
* Tries to get the local port of a socket. Sets 'port' to -1 on error.
*/
static void
get_local_port(int socketfd, int *port)
{
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
memset(&addr, 0, addr_len);
if (getsockname(socketfd, (struct sockaddr*) &addr, &addr_len) == 0)
{
*port = ntohs(addr.sin_port);
} else {
*port = -1;
}
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
@@ -872,17 +811,15 @@ retry:
*/
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= pageserver_response_log_timeout)
{
int port;
int sndbuf;
int recvbuf;
get_local_port(PQsocket(pageserver_conn), &port);
get_socket_stats(PQsocket(pageserver_conn), &sndbuf, &recvbuf);
neon_shard_log(shard_no, LOG,
"no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket port=%d sndbuf=%d recvbuf=%d) (conn start=%d end=%d)",
"no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d) (conn start=%d end=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received, port, sndbuf, recvbuf,
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf,
pageserver_conn->inStart, pageserver_conn->inEnd);
shard->receive_last_log_time = now;
shard->receive_logged = true;
@@ -904,10 +841,8 @@ retry:
*/
if (INSTR_TIME_GET_MILLISEC(since_start) >= pageserver_response_disconnect_timeout)
{
int port;
get_local_port(PQsocket(pageserver_conn), &port);
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting (socket port=%d)",
INSTR_TIME_GET_DOUBLE(since_start), port);
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting",
INSTR_TIME_GET_DOUBLE(since_start));
pageserver_disconnect(shard_no);
return -1;
}
@@ -1142,22 +1077,15 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
int rc;
/* read response */
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */);
if (rc == 0)
return NULL;
@@ -1437,22 +1365,11 @@ pg_init_libpagestore(void)
"If the pageserver doesn't respond to a request within this timeout, "
"disconnect and reconnect.",
&pageserver_response_disconnect_timeout,
150000, 100, INT_MAX,
120000, 100, INT_MAX,
PGC_SUSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"neon.compute_mode",
"The compute endpoint node type",
NULL,
&neon_compute_mode,
CP_MODE_PRIMARY,
neon_compute_modes,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)

View File

@@ -315,7 +315,7 @@ static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
bits8 rv = 1;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

View File

@@ -99,7 +99,7 @@ static char *hexdump_page(char *page);
#define IS_LOCAL_REL(reln) (\
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \
)
const int SmgrTrace = DEBUG5;
@@ -1040,16 +1040,6 @@ prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, n
continue;
}
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forknum, blocknum + i, buffers[i]);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
@@ -1081,9 +1071,6 @@ prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_r
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
* to calculate the LSNs to send.
*
* Bits set in *mask (if present) indicate pages already read; i.e. pages we
* can skip in this process.
*
* When performing a prefetch rather than a synchronous request,
* is_prefetch==true. Currently, it only affects how the request is accounted
* in the perf counters.
@@ -1129,7 +1116,7 @@ Retry:
uint64 ring_index;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
if (frlsns)
@@ -2384,6 +2371,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LSN_FORMAT_ARGS(last_written_lsn),
LSN_FORMAT_ARGS(flushlsn));
XLogFlush(last_written_lsn);
flushlsn = last_written_lsn;
}
/*
@@ -2399,35 +2387,18 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
*
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
* but it is not correct. Consider the following scenario:
* 1. Backend A wants to prefetch block X
* 2. Backend A checks that block X is not present in the shared buffer cache
* 3. Backend A calls prefetch_do_request, which calls neon_get_request_lsns
* 4. neon_get_request_lsns obtains LwLSN=11 for the block
* 5. Backend B downloads block X, updates and wallogs it with LSN=13
* 6. Block X is once again evicted from shared buffers, its LwLSN is set to LSN=13
* 7. Backend A is still executing in neon_get_request_lsns(). It calls 'flushlsn = GetFlushRecPtr();'.
* Let's say that it is LSN=14
* 8. Backend A uses LSN=14 as effective_lsn in the prefetch slot. The request stored in the slot is
* [not_modified_since=11, effective_request_lsn=14]
* 9. Backend A sends the prefetch request, pageserver processes it, and sends response.
* The last LSN that the pageserver had processed was LSN=12, so the page image in the response is valid at LSN=12.
* 10. Backend A calls smgrread() for page X with LwLSN=13
* 11. Backend A finds in prefetch ring the response for the prefetch request with [not_modified_since=11, effective_lsn=Lsn14],
* so it satisfies neon_prefetch_response_usable condition.
*
* Things go wrong in step 7-8, when [not_modified_since=11, effective_request_lsn=14] is determined for the request.
* That is incorrect, because the page has in fact been modified at LSN=13. The invariant is that for any request,
* there should not be any modifications to a page between its not_modified_since and (effective_)request_lsn values.
*
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
* Remember the current LSN, however, so that we can later
* correctly determine if the response to the request is still
* valid. The most up-to-date LSN we could use for that purpose
* would be the current insert LSN, but to avoid the overhead of
* looking it up, use 'flushlsn' instead. This relies on the
* assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we
* wouldn't be requesting it.
*/
result->request_lsn = UINT64_MAX;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = last_written_lsn;
result->effective_request_lsn = flushlsn;
}
}
}
@@ -2486,8 +2457,11 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
* `effective_request_lsn` is the last flush WAL position when the request
* was sent to the pageserver. That's logically the LSN that we are
* requesting the page at, but we send UINT64_MAX to the pageserver so
* that if the GC horizon advances past that position, we still get a
* valid response instead of an error.
*
* To determine whether a response to a GetPage request issued earlier is
* still valid to satisfy a new page read, we look at the
@@ -3042,6 +3016,9 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
@@ -3147,15 +3124,6 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
#endif
}
/*
* Read N pages at a specific LSN.
*
* *mask is set for pages read at a previous point in time, and which we
* should not touch, nor overwrite.
* New bits should be set in *mask for the pages we'successfully read.
*
* The offsets in request_lsns, buffers, and mask are linked.
*/
static void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
@@ -3208,7 +3176,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block
neon_request_lsns *reqlsns = &request_lsns[i];
TimestampTz start_ts, end_ts;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
start_ts = GetCurrentTimestamp();
@@ -3309,12 +3277,6 @@ Retry:
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
break;
@@ -3507,7 +3469,9 @@ static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
bits8 read_pages[PG_IOV_MAX / 8];
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
bits8 lfc_hits[PG_IOV_MAX / 8];
bits8 read[PG_IOV_MAX / 8];
neon_request_lsns request_lsns[PG_IOV_MAX];
int lfc_result;
int prefetch_result;
@@ -3539,18 +3503,19 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
memset(read_pages, 0, sizeof(read_pages));
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
if (prefetch_result == nblocks)
return;
/* invert the result: exclude prefetched blocks */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_hits[i] = ~prefetch_hits[i];
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
nblocks, read_pages);
nblocks, lfc_hits);
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
@@ -3559,8 +3524,21 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (prefetch_result + lfc_result == nblocks)
return;
if (lfc_result <= 0)
{
/* can't use the LFC result, so read all blocks from PS */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~prefetch_hits[i];
}
else
{
/* invert the result: exclude blocks read from lfc */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
}
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read_pages);
buffers, nblocks, read);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.

40
poetry.lock generated
View File

@@ -3111,30 +3111,30 @@ six = "*"
[[package]]
name = "ruff"
version = "0.11.2"
version = "0.7.0"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.11.2-py3-none-linux_armv6l.whl", hash = "sha256:c69e20ea49e973f3afec2c06376eb56045709f0212615c1adb0eda35e8a4e477"},
{file = "ruff-0.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:2c5424cc1c4eb1d8ecabe6d4f1b70470b4f24a0c0171356290b1953ad8f0e272"},
{file = "ruff-0.11.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:ecf20854cc73f42171eedb66f006a43d0a21bfb98a2523a809931cda569552d9"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0c543bf65d5d27240321604cee0633a70c6c25c9a2f2492efa9f6d4b8e4199bb"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:20967168cc21195db5830b9224be0e964cc9c8ecf3b5a9e3ce19876e8d3a96e3"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:955a9ce63483999d9f0b8f0b4a3ad669e53484232853054cc8b9d51ab4c5de74"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:86b3a27c38b8fce73bcd262b0de32e9a6801b76d52cdb3ae4c914515f0cef608"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a3b66a03b248c9fcd9d64d445bafdf1589326bee6fc5c8e92d7562e58883e30f"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0397c2672db015be5aa3d4dac54c69aa012429097ff219392c018e21f5085147"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:869bcf3f9abf6457fbe39b5a37333aa4eecc52a3b99c98827ccc371a8e5b6f1b"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2a2b50ca35457ba785cd8c93ebbe529467594087b527a08d487cf0ee7b3087e9"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:7c69c74bf53ddcfbc22e6eb2f31211df7f65054bfc1f72288fc71e5f82db3eab"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6e8fb75e14560f7cf53b15bbc55baf5ecbe373dd5f3aab96ff7aa7777edd7630"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:842a472d7b4d6f5924e9297aa38149e5dcb1e628773b70e6387ae2c97a63c58f"},
{file = "ruff-0.11.2-py3-none-win32.whl", hash = "sha256:aca01ccd0eb5eb7156b324cfaa088586f06a86d9e5314b0eb330cb48415097cc"},
{file = "ruff-0.11.2-py3-none-win_amd64.whl", hash = "sha256:3170150172a8f994136c0c66f494edf199a0bbea7a409f649e4bc8f4d7084080"},
{file = "ruff-0.11.2-py3-none-win_arm64.whl", hash = "sha256:52933095158ff328f4c77af3d74f0379e34fd52f175144cefc1b192e7ccd32b4"},
{file = "ruff-0.11.2.tar.gz", hash = "sha256:ec47591497d5a1050175bdf4e1a4e6272cddff7da88a2ad595e1e326041d8d94"},
{file = "ruff-0.7.0-py3-none-linux_armv6l.whl", hash = "sha256:0cdf20c2b6ff98e37df47b2b0bd3a34aaa155f59a11182c1303cce79be715628"},
{file = "ruff-0.7.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:496494d350c7fdeb36ca4ef1c9f21d80d182423718782222c29b3e72b3512737"},
{file = "ruff-0.7.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:214b88498684e20b6b2b8852c01d50f0651f3cc6118dfa113b4def9f14faaf06"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:630fce3fefe9844e91ea5bbf7ceadab4f9981f42b704fae011bb8efcaf5d84be"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:211d877674e9373d4bb0f1c80f97a0201c61bcd1e9d045b6e9726adc42c156aa"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:194d6c46c98c73949a106425ed40a576f52291c12bc21399eb8f13a0f7073495"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:82c2579b82b9973a110fab281860403b397c08c403de92de19568f32f7178598"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9af971fe85dcd5eaed8f585ddbc6bdbe8c217fb8fcf510ea6bca5bdfff56040e"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b641c7f16939b7d24b7bfc0be4102c56562a18281f84f635604e8a6989948914"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d71672336e46b34e0c90a790afeac8a31954fd42872c1f6adaea1dff76fd44f9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ab7d98c7eed355166f367597e513a6c82408df4181a937628dbec79abb2a1fe4"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:1eb54986f770f49edb14f71d33312d79e00e629a57387382200b1ef12d6a4ef9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:dc452ba6f2bb9cf8726a84aa877061a2462afe9ae0ea1d411c53d226661c601d"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:4b406c2dce5be9bad59f2de26139a86017a517e6bcd2688da515481c05a2cb11"},
{file = "ruff-0.7.0-py3-none-win32.whl", hash = "sha256:f6c968509f767776f524a8430426539587d5ec5c662f6addb6aa25bc2e8195ec"},
{file = "ruff-0.7.0-py3-none-win_amd64.whl", hash = "sha256:ff4aabfbaaba880e85d394603b9e75d32b0693152e16fa659a3064a85df7fce2"},
{file = "ruff-0.7.0-py3-none-win_arm64.whl", hash = "sha256:10842f69c245e78d6adec7e1db0a7d9ddc2fff0621d730e61657b64fa36f207e"},
{file = "ruff-0.7.0.tar.gz", hash = "sha256:47a86360cf62d9cd53ebfb0b5eb0e882193fc191c6d717e8bef4462bc3b9ea2b"},
]
[[package]]
@@ -3844,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
content-hash = "715fc8c896dcfa1b15054deeddcdec557ef93af91b26e1c8e4688fe4dbef5296"

View File

@@ -314,9 +314,9 @@ pub async fn run() -> anyhow::Result<()> {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
Some(url) => Some(
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
),
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(

View File

@@ -1,6 +1,5 @@
//! Mock console backend which relies on a user-provided postgres instance.
use std::io;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::sync::Arc;
@@ -23,6 +22,7 @@ use crate::control_plane::errors::{
};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::{AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
use crate::error::io_error;
use crate::intern::RoleNameInt;
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
use crate::url::ApiUrl;
@@ -36,13 +36,13 @@ enum MockApiError {
impl From<MockApiError> for ControlPlaneError {
fn from(e: MockApiError) -> Self {
io::Error::other(e).into()
io_error(e).into()
}
}
impl From<tokio_postgres::Error> for ControlPlaneError {
fn from(e: tokio_postgres::Error) -> Self {
io::Error::other(e).into()
io_error(e).into()
}
}

View File

@@ -1,10 +1,8 @@
use std::io;
use thiserror::Error;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason};
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
@@ -81,13 +79,13 @@ impl CouldRetry for ControlPlaneError {
impl From<reqwest::Error> for ControlPlaneError {
fn from(e: reqwest::Error) -> Self {
io::Error::other(e).into()
io_error(e).into()
}
}
impl From<reqwest_middleware::Error> for ControlPlaneError {
fn from(e: reqwest_middleware::Error) -> Self {
io::Error::other(e).into()
io_error(e).into()
}
}

View File

@@ -1,9 +1,15 @@
use std::fmt;
use std::error::Error as StdError;
use std::{fmt, io};
use anyhow::Context;
use measured::FixedCardinalityLabel;
use tokio::task::JoinError;
/// Upcast (almost) any error into an opaque [`io::Error`].
pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// Marks errors that may be safely shown to a client.
/// This trait can be seen as a specialized version of [`ToString`].
///

View File

@@ -163,7 +163,8 @@ fn process_proxy_payload(
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
#[rustfmt::skip] // https://github.com/rust-lang/rustfmt/issues/6384
_ => return Err(io::Error::other(
_ => return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"invalid proxy protocol command 0x{:02X}. expected local (0x20) or proxy (0x21)",
header.version_and_command
@@ -177,20 +178,21 @@ fn process_proxy_payload(
TCP_OVER_IPV4 | UDP_OVER_IPV4 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV4>()
.ok_or_else(|| io::Error::other(size_err))?;
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
TCP_OVER_IPV6 | UDP_OVER_IPV6 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV6>()
.ok_or_else(|| io::Error::other(size_err))?;
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
// unspecified or unix stream. ignore the addresses
_ => {
return Err(io::Error::other(
return Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol address family/transport protocol.",
));
}

View File

@@ -143,8 +143,6 @@ impl ConnectionWithCredentialsProvider {
db: 0,
username: Some(username),
password: Some(password.clone()),
// TODO: switch to RESP3 after testing new client version.
protocol: redis::ProtocolVersion::RESP2,
},
})
}

View File

@@ -19,7 +19,7 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.clone()),
Value::String(s) => Some(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),

View File

@@ -866,7 +866,7 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
match select(
let res = match select(
pin!(query_to_json(
config,
&mut *inner,
@@ -889,7 +889,7 @@ impl QueryData {
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
Err(e)
return Err(e);
}
// The query was cancelled.
Either::Right((_cancelled, query)) => {
@@ -930,7 +930,8 @@ impl QueryData {
}
}
}
}
};
res
}
}

View File

@@ -15,7 +15,7 @@ use tracing::warn;
use crate::cancellation::CancellationHandler;
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::error::{ReportableError, io_error};
use crate::metrics::Metrics;
use crate::proxy::{ClientMode, ErrorSource, handle_client};
use crate::rate_limiter::EndpointRateLimiter;
@@ -50,23 +50,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
let this = self.project();
let mut stream = this.stream;
ready!(stream.as_mut().poll_ready(cx).map_err(io::Error::other))?;
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
this.send.put(buf);
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(io::Error::other(e))),
Err(e) => Poll::Ready(Err(io_error(e))),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_flush(cx).map_err(io::Error::other)
stream.poll_flush(cx).map_err(io_error)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_close(cx).map_err(io::Error::other)
stream.poll_close(cx).map_err(io_error)
}
}
@@ -97,7 +97,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
}
let res = ready!(this.stream.as_mut().poll_next(cx));
match res.transpose().map_err(io::Error::other)? {
match res.transpose().map_err(io_error)? {
Some(message) => match message.opcode {
OpCode::Ping => {}
OpCode::Pong => {}
@@ -105,7 +105,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
// We expect to see only binary messages.
let error = "unexpected text message in the websocket";
warn!(length = message.payload.len(), error);
return Poll::Ready(Err(io::Error::other(error)));
return Poll::Ready(Err(io_error(error)));
}
OpCode::Binary | OpCode::Continuation => {
debug_assert!(this.recv.is_empty());

View File

@@ -173,7 +173,7 @@ impl CertResolver {
}
pub fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().cloned().collect()
self.certs.keys().map(|s| s.to_string()).collect()
}
}

View File

@@ -53,7 +53,7 @@ jsonnet = "^0.21.0-rc2"
[tool.poetry.group.dev.dependencies]
mypy = "==1.13.0"
ruff = "^0.11.2"
ruff = "^0.7.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
@@ -109,5 +109,4 @@ select = [
"W", # pycodestyle
"B", # bugbear
"UP", # pyupgrade
"TC", # flake8-type-checking
]

View File

@@ -38,8 +38,9 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
#[error("request timed out: {0}")]
Timeout(String),
/// Failed to create client.
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -119,12 +120,6 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self.request(Method::DELETE, &uri, ()).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn bump_timeline_term(
&self,
tenant_id: TenantId,

View File

@@ -21,7 +21,7 @@ use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
DEFAULT_SSL_KEY_FILE,
};
use safekeeper::{
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
@@ -214,10 +214,7 @@ struct Args {
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
/// Trusted root CA certificate to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
}
@@ -353,13 +350,13 @@ async fn main() -> anyhow::Result<()> {
}
};
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Certificate::from_pem_bundle(&buf)?
Some(Certificate::from_pem(&buf)?)
}
None => Vec::new(),
None => None,
};
let conf = Arc::new(SafeKeeperConf {
@@ -397,8 +394,7 @@ async fn main() -> anyhow::Result<()> {
max_delta_for_fanout: args.max_delta_for_fanout,
ssl_key_file: args.ssl_key_file,
ssl_cert_file: args.ssl_cert_file,
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
ssl_ca_cert,
});
// initialize sentry if SENTRY_DSN is provided

View File

@@ -1,7 +1,6 @@
pub mod routes;
use std::sync::Arc;
use http_utils::tls_certs::ReloadingCertificateResolver;
pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
@@ -30,16 +29,12 @@ pub async fn task_main_https(
https_listener: std::net::TcpListener,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let cert_resolver = ReloadingCertificateResolver::new(
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
)
.await?;
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
.with_single_cert(certs, key)?;
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));

View File

@@ -235,7 +235,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp = pull_timeline::handle_request(
data,
conf.sk_auth_token.clone(),
conf.ssl_ca_certs.clone(),
conf.ssl_ca_cert.clone(),
global_timelines,
)
.await

View File

@@ -73,7 +73,6 @@ pub mod defaults {
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
}
#[derive(Debug, Clone)]
@@ -119,8 +118,7 @@ pub struct SafeKeeperConf {
pub max_delta_for_fanout: Option<u64>,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Certificate>,
pub ssl_ca_cert: Option<Certificate>,
}
impl SafeKeeperConf {
@@ -168,8 +166,7 @@ impl SafeKeeperConf {
max_delta_for_fanout: None,
ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
ssl_ca_cert: None,
}
}
}

View File

@@ -393,7 +393,7 @@ pub struct DebugDumpResponse {
pub async fn handle_request(
request: PullTimelineRequest,
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
ssl_ca_cert: Option<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
@@ -405,7 +405,7 @@ pub async fn handle_request(
}
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs {
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;

View File

@@ -182,8 +182,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
max_delta_for_fanout: None,
ssl_key_file: Utf8PathBuf::from(""),
ssl_cert_file: Utf8PathBuf::from(""),
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
ssl_ca_cert: None,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -8,12 +8,9 @@
from __future__ import annotations
import argparse
from typing import TYPE_CHECKING
import psycopg2
if TYPE_CHECKING:
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import connection as PgConnection
def main(args: argparse.Namespace):

View File

@@ -7,13 +7,13 @@ import logging
import signal
import sys
from collections import defaultdict
from collections.abc import Awaitable
from dataclasses import dataclass
from typing import TYPE_CHECKING
import aiohttp
if TYPE_CHECKING:
from collections.abc import Awaitable
from typing import Any

View File

@@ -16,11 +16,10 @@ testing = []
[dependencies]
anyhow.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
clashmap.workspace = true
cron.workspace = true
clashmap.workspace = true
fail.workspace = true
futures.workspace = true
governor.workspace = true
@@ -45,9 +44,8 @@ rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
rustls.workspace = true

View File

@@ -624,19 +624,16 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -8,6 +8,7 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
use pageserver_api::models::PageserverUtilization;
use reqwest::Certificate;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
@@ -26,8 +27,8 @@ struct HeartbeaterTask<Server, State> {
max_offline_interval: Duration,
max_warming_up_interval: Duration,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
}
#[derive(Debug, Clone)]
@@ -75,8 +76,8 @@ where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
pub(crate) fn new(
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -85,8 +86,8 @@ where
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
http_client,
jwt_token,
ssl_ca_cert,
max_offline_interval,
max_warming_up_interval,
cancel,
@@ -121,8 +122,8 @@ where
{
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -133,8 +134,8 @@ where
state: HashMap::new(),
max_offline_interval,
max_warming_up_interval,
http_client,
jwt_token,
ssl_ca_cert,
}
}
async fn run(&mut self) {
@@ -177,7 +178,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let http_client = self.http_client.clone();
let ssl_ca_cert = self.ssl_ca_cert.clone();
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
@@ -192,8 +193,8 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let response = node_clone
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),
@@ -328,19 +329,19 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
continue;
}
heartbeat_futs.push({
let http_client = self.http_client.clone();
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let ssl_ca_cert = self.ssl_ca_cert.clone();
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),

View File

@@ -24,9 +24,9 @@ use pageserver_api::controller_api::{
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
};
use pageserver_api::models::{
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest,
DetachBehavior, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -582,32 +582,6 @@ async fn handle_tenant_timeline_download_heatmap_layers(
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_lsn_lease(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
service
.tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
.await?;
json_response(StatusCode::OK, ())
}
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
// compare to, so we can just filter out our well known ID format with regexes.
@@ -682,10 +656,11 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(
service.get_http_client().clone(),
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
);
service.get_config().ssl_ca_cert.clone(),
)
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
@@ -2218,17 +2193,6 @@ pub fn make_router(
)
},
)
// LSN lease passthrough to all shards
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_lsn_lease,
RequestName("v1_tenant_timeline_lsn_lease"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

Some files were not shown because too many files have changed in this diff Show More