mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
29 Commits
cloneable/
...
conrad/fla
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0411e612a | ||
|
|
0579533ac8 | ||
|
|
99c0da9607 | ||
|
|
c179d098ef | ||
|
|
4bc6dbdd5f | ||
|
|
7dc8370848 | ||
|
|
c4fc602115 | ||
|
|
02936b82c5 | ||
|
|
1fad1abb24 | ||
|
|
016068b966 | ||
|
|
80596feeaa | ||
|
|
225cabd84d | ||
|
|
557127550c | ||
|
|
cfe3e6d4e1 | ||
|
|
47d47000df | ||
|
|
e5b95bc9dc | ||
|
|
0ee5bfa2fc | ||
|
|
00bcafe82e | ||
|
|
ed117af73e | ||
|
|
21a891a06d | ||
|
|
30a7dd630c | ||
|
|
db5384e1b0 | ||
|
|
5cb6a4bc8b | ||
|
|
1dbf40ee2c | ||
|
|
939354abea | ||
|
|
1d5d168626 | ||
|
|
b40dd54732 | ||
|
|
4bb7087d4d | ||
|
|
5f3551e405 |
27
.github/actionlint.yml
vendored
27
.github/actionlint.yml
vendored
@@ -8,6 +8,7 @@ self-hosted-runner:
|
||||
- small-arm64
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- AWS_ECR_REGION
|
||||
- AZURE_DEV_CLIENT_ID
|
||||
- AZURE_DEV_REGISTRY_NAME
|
||||
- AZURE_DEV_SUBSCRIPTION_ID
|
||||
@@ -15,23 +16,25 @@ config-variables:
|
||||
- AZURE_PROD_REGISTRY_NAME
|
||||
- AZURE_PROD_SUBSCRIPTION_ID
|
||||
- AZURE_TENANT_ID
|
||||
- BENCHMARK_INGEST_TARGET_PROJECTID
|
||||
- BENCHMARK_LARGE_OLTP_PROJECTID
|
||||
- BENCHMARK_PROJECT_ID_PUB
|
||||
- BENCHMARK_PROJECT_ID_SUB
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
- DEV_AWS_OIDC_ROLE_ARN
|
||||
- BENCHMARK_INGEST_TARGET_PROJECTID
|
||||
- PGREGRESS_PG16_PROJECT_ID
|
||||
- PGREGRESS_PG17_PROJECT_ID
|
||||
- SLACK_ON_CALL_QA_STAGING_STREAM
|
||||
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
|
||||
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
|
||||
- SLACK_CICD_CHANNEL_ID
|
||||
- SLACK_STORAGE_CHANNEL_ID
|
||||
- HETZNER_CACHE_BUCKET
|
||||
- HETZNER_CACHE_ENDPOINT
|
||||
- HETZNER_CACHE_REGION
|
||||
- NEON_DEV_AWS_ACCOUNT_ID
|
||||
- NEON_PROD_AWS_ACCOUNT_ID
|
||||
- AWS_ECR_REGION
|
||||
- BENCHMARK_LARGE_OLTP_PROJECTID
|
||||
- PGREGRESS_PG16_PROJECT_ID
|
||||
- PGREGRESS_PG17_PROJECT_ID
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_CICD_CHANNEL_ID
|
||||
- SLACK_ON_CALL_DEVPROD_STREAM
|
||||
- SLACK_ON_CALL_QA_STAGING_STREAM
|
||||
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
|
||||
- SLACK_RUST_CHANNEL_ID
|
||||
- SLACK_STORAGE_CHANNEL_ID
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
|
||||
28
.github/workflows/_build-and-test-locally.yml
vendored
28
.github/workflows/_build-and-test-locally.yml
vendored
@@ -128,29 +128,49 @@ jobs:
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_17
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
|
||||
|
||||
|
||||
@@ -37,8 +37,14 @@ jobs:
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
- name: Cache poetry deps
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
|
||||
7
.github/workflows/_check-codestyle-rust.yml
vendored
7
.github/workflows/_check-codestyle-rust.yml
vendored
@@ -48,8 +48,13 @@ jobs:
|
||||
submodules: true
|
||||
|
||||
- name: Cache cargo deps
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
|
||||
30
.github/workflows/_meta.yml
vendored
30
.github/workflows/_meta.yml
vendored
@@ -5,6 +5,9 @@ on:
|
||||
github-event-name:
|
||||
type: string
|
||||
required: true
|
||||
github-event-json:
|
||||
type: string
|
||||
required: true
|
||||
outputs:
|
||||
build-tag:
|
||||
description: "Tag for the current workflow run"
|
||||
@@ -27,6 +30,9 @@ on:
|
||||
release-pr-run-id:
|
||||
description: "Only available if `run-kind in [storage-release, proxy-release, compute-release]`. Contains the run ID of the `Build and Test` workflow, assuming one with the current commit can be found."
|
||||
value: ${{ jobs.tags.outputs.release-pr-run-id }}
|
||||
sha:
|
||||
description: "github.event.pull_request.head.sha on release PRs, github.sha otherwise"
|
||||
value: ${{ jobs.tags.outputs.sha }}
|
||||
|
||||
permissions: {}
|
||||
|
||||
@@ -45,6 +51,7 @@ jobs:
|
||||
storage: ${{ steps.previous-releases.outputs.storage }}
|
||||
run-kind: ${{ steps.run-kind.outputs.run-kind }}
|
||||
release-pr-run-id: ${{ steps.release-pr-run-id.outputs.release-pr-run-id }}
|
||||
sha: ${{ steps.sha.outputs.sha }}
|
||||
permissions:
|
||||
contents: read
|
||||
steps:
|
||||
@@ -54,10 +61,6 @@ jobs:
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Get run kind
|
||||
id: run-kind
|
||||
env:
|
||||
@@ -78,6 +81,23 @@ jobs:
|
||||
run: |
|
||||
echo "run-kind=$RUN_KIND" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
- name: Get the right SHA
|
||||
id: sha
|
||||
env:
|
||||
SHA: >
|
||||
${{
|
||||
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), steps.run-kind.outputs.run-kind)
|
||||
&& fromJSON(inputs.github-event-json).pull_request.head.sha
|
||||
|| github.sha
|
||||
}}
|
||||
run: |
|
||||
echo "sha=$SHA" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
ref: ${{ steps.sha.outputs.sha }}
|
||||
|
||||
- name: Get build tag
|
||||
id: build-tag
|
||||
env:
|
||||
@@ -143,7 +163,7 @@ jobs:
|
||||
if: ${{ contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), steps.run-kind.outputs.run-kind) }}
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
CURRENT_SHA: ${{ github.sha }}
|
||||
run: |
|
||||
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
|
||||
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
63
.github/workflows/build-macos.yml
vendored
63
.github/workflows/build-macos.yml
vendored
@@ -63,8 +63,13 @@ jobs:
|
||||
|
||||
- name: Cache postgres ${{ matrix.postgres-version }} build
|
||||
id: cache_pg
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/${{ matrix.postgres-version }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -129,15 +134,25 @@ jobs:
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -203,32 +218,57 @@ jobs:
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_v15
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_v16
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_v17
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache cargo deps (only for v17)
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
@@ -238,8 +278,13 @@ jobs:
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
|
||||
11
.github/workflows/build_and_test.yml
vendored
11
.github/workflows/build_and_test.yml
vendored
@@ -80,6 +80,7 @@ jobs:
|
||||
uses: ./.github/workflows/_meta.yml
|
||||
with:
|
||||
github-event-name: ${{ github.event_name }}
|
||||
github-event-json: ${{ toJSON(github.event) }}
|
||||
|
||||
build-build-tools-image:
|
||||
needs: [ check-permissions ]
|
||||
@@ -248,8 +249,13 @@ jobs:
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Cache poetry deps
|
||||
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
@@ -540,6 +546,7 @@ jobs:
|
||||
uses: ./.github/workflows/trigger-e2e-tests.yml
|
||||
with:
|
||||
github-event-name: ${{ github.event_name }}
|
||||
github-event-json: ${{ toJSON(github.event) }}
|
||||
secrets: inherit
|
||||
|
||||
neon-image-arch:
|
||||
@@ -563,6 +570,7 @@ jobs:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
submodules: true
|
||||
ref: ${{ needs.meta.outputs.sha }}
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0
|
||||
@@ -672,6 +680,7 @@ jobs:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
submodules: true
|
||||
ref: ${{ needs.meta.outputs.sha }}
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0
|
||||
|
||||
@@ -55,7 +55,7 @@ jobs:
|
||||
echo tag=${tag} >> ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Test extension upgrade
|
||||
timeout-minutes: 20
|
||||
timeout-minutes: 60
|
||||
env:
|
||||
NEW_COMPUTE_TAG: latest
|
||||
OLD_COMPUTE_TAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
|
||||
|
||||
@@ -23,7 +23,7 @@ jobs:
|
||||
egress-policy: audit
|
||||
|
||||
- name: Export Workflow Run for the past 2 hours
|
||||
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
|
||||
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
|
||||
with:
|
||||
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
db_table: "gh_workflow_stats_neon"
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
egress-policy: audit
|
||||
|
||||
- name: Export Workflow Run for the past 48 hours
|
||||
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
|
||||
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
|
||||
with:
|
||||
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
db_table: "gh_workflow_stats_neon"
|
||||
@@ -63,7 +63,7 @@ jobs:
|
||||
egress-policy: audit
|
||||
|
||||
- name: Export Workflow Run for the past 30 days
|
||||
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
|
||||
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
|
||||
with:
|
||||
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
db_table: "gh_workflow_stats_neon"
|
||||
|
||||
4
.github/workflows/trigger-e2e-tests.yml
vendored
4
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -9,6 +9,9 @@ on:
|
||||
github-event-name:
|
||||
type: string
|
||||
required: true
|
||||
github-event-json:
|
||||
type: string
|
||||
required: true
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -48,6 +51,7 @@ jobs:
|
||||
uses: ./.github/workflows/_meta.yml
|
||||
with:
|
||||
github-event-name: ${{ inputs.github-event-name || github.event_name }}
|
||||
github-event-json: ${{ inputs.github-event-json || toJSON(github.event) }}
|
||||
|
||||
trigger-e2e-tests:
|
||||
needs: [ meta ]
|
||||
|
||||
64
Cargo.lock
generated
64
Cargo.lock
generated
@@ -148,9 +148,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.6.0"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
|
||||
|
||||
[[package]]
|
||||
name = "archery"
|
||||
@@ -2248,6 +2248,17 @@ version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
||||
|
||||
[[package]]
|
||||
name = "flag-bearer"
|
||||
version = "0.1.0-rc.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8feaa1b7a5ad6e6dd7791d42d36c1a25004f1c25eae9ab7b904c864109d8260"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"pin-list",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flagset"
|
||||
version = "0.4.6"
|
||||
@@ -3861,11 +3872,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
|
||||
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
@@ -3914,11 +3924,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
version = "0.1.46"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
|
||||
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
@@ -3947,9 +3956,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.15"
|
||||
version = "0.2.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
|
||||
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"libm",
|
||||
@@ -4556,6 +4565,16 @@ dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-list"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a3c0987a7464afc0d593f13429732ef87e9a6c8e7909a1a22faeff7e1d2159d"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"pinned-aliasable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.9"
|
||||
@@ -4588,6 +4607,12 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pinned-aliasable"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d0f9ae89bf0ed03b69ac1f3f7ea2e6e09b4fa5448011df2e67d581c2b850b7b"
|
||||
|
||||
[[package]]
|
||||
name = "pkcs1"
|
||||
version = "0.7.5"
|
||||
@@ -5081,6 +5106,7 @@ dependencies = [
|
||||
"ed25519-dalek",
|
||||
"env_logger",
|
||||
"fallible-iterator",
|
||||
"flag-bearer",
|
||||
"flate2",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
@@ -5362,26 +5388,25 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "0.25.2"
|
||||
version = "0.29.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71d64e978fd98a0e6b105d066ba4889a7301fca65aeac850a877d8797343feeb"
|
||||
checksum = "b110459d6e323b7cda23980c46c77157601199c9da6241552b284cd565a7a133"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"arc-swap",
|
||||
"bytes",
|
||||
"combine",
|
||||
"futures-util",
|
||||
"itoa",
|
||||
"num-bigint",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
"rustls 0.23.18",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"ryu",
|
||||
"sha1_smol",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
@@ -7217,15 +7242,14 @@ dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol2",
|
||||
"postgres-types2",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -50,7 +50,7 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
ahash = "0.8"
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
arc-swap = "1.7"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
flate2 = "1.0.26"
|
||||
@@ -130,7 +130,7 @@ nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal"
|
||||
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
|
||||
notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
num-traits = "0.2.19"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.27"
|
||||
opentelemetry_sdk = "0.27"
|
||||
@@ -146,7 +146,7 @@ procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13"
|
||||
rand = "0.8"
|
||||
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
regex = "1.10.2"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
|
||||
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
|
||||
|
||||
@@ -1916,26 +1916,30 @@ RUN apt update && \
|
||||
;; \
|
||||
esac && \
|
||||
apt install --no-install-recommends -y \
|
||||
ca-certificates \
|
||||
gdb \
|
||||
liblz4-1 \
|
||||
libreadline8 \
|
||||
iproute2 \
|
||||
libboost-iostreams1.74.0 \
|
||||
libboost-regex1.74.0 \
|
||||
libboost-serialization1.74.0 \
|
||||
libboost-system1.74.0 \
|
||||
libossp-uuid16 \
|
||||
libcurl4 \
|
||||
libevent-2.1-7 \
|
||||
libgeos-c1v5 \
|
||||
liblz4-1 \
|
||||
libossp-uuid16 \
|
||||
libprotobuf-c1 \
|
||||
libreadline8 \
|
||||
libsfcgal1 \
|
||||
libxml2 \
|
||||
libxslt1.1 \
|
||||
libzstd1 \
|
||||
libcurl4 \
|
||||
libevent-2.1-7 \
|
||||
locales \
|
||||
lsof \
|
||||
procps \
|
||||
ca-certificates \
|
||||
rsyslog \
|
||||
screen \
|
||||
tcpdump \
|
||||
$VERSION_INSTALLS && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
@@ -45,7 +45,9 @@ use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use compute_api::responses::ComputeCtlConfig;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
|
||||
use compute_tools::compute::{
|
||||
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
|
||||
};
|
||||
use compute_tools::extension_server::get_pg_version_string;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::params::*;
|
||||
@@ -57,10 +59,6 @@ use tracing::{error, info};
|
||||
use url::Url;
|
||||
use utils::failpoint_support;
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
@@ -147,7 +145,7 @@ fn main() -> Result<()> {
|
||||
.build()?;
|
||||
let _rt_guard = runtime.enter();
|
||||
|
||||
let build_tag = runtime.block_on(init())?;
|
||||
runtime.block_on(init())?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
@@ -174,8 +172,6 @@ fn main() -> Result<()> {
|
||||
cgroup: cli.cgroup,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
build_tag,
|
||||
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
},
|
||||
cli_spec.spec,
|
||||
@@ -189,7 +185,7 @@ fn main() -> Result<()> {
|
||||
deinit_and_exit(exit_code);
|
||||
}
|
||||
|
||||
async fn init() -> Result<String> {
|
||||
async fn init() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
@@ -199,12 +195,9 @@ async fn init() -> Result<String> {
|
||||
}
|
||||
});
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string();
|
||||
info!("build_tag: {build_tag}");
|
||||
info!("compute build_tag: {}", &BUILD_TAG.to_string());
|
||||
|
||||
Ok(build_tag)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
|
||||
@@ -20,6 +20,7 @@ use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
@@ -35,6 +36,7 @@ use crate::disk_quota::set_disk_quota;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::rsyslog::{
|
||||
@@ -49,6 +51,17 @@ use crate::{config, extension_server, local_proxy};
|
||||
|
||||
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
|
||||
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
|
||||
// This is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
/// Build tag/version of the compute node binaries/image. It's tricky and ugly
|
||||
/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a
|
||||
/// global static variable.
|
||||
pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
|
||||
option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string()
|
||||
});
|
||||
|
||||
/// Static configuration params that don't change after startup. These mostly
|
||||
/// come from the CLI args, or are derived from them.
|
||||
@@ -72,7 +85,6 @@ pub struct ComputeNodeParams {
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgversion: String,
|
||||
pub build_tag: String,
|
||||
|
||||
/// The port that the compute's external HTTP server listens on
|
||||
pub external_http_port: u16,
|
||||
@@ -173,6 +185,11 @@ impl ComputeState {
|
||||
info!("Changing compute status from {} to {}", prev, status);
|
||||
self.status = status;
|
||||
state_changed.notify_all();
|
||||
|
||||
COMPUTE_CTL_UP.reset();
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) {
|
||||
@@ -352,13 +369,19 @@ impl ComputeNode {
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
// The internal HTTP server is needed for a further activation by control plane
|
||||
// if compute was started for a pool, so we have to start server before hanging
|
||||
// waiting for a spec.
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// HTTP server is running, so we can officially declare compute_ctl as 'up'
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
@@ -2032,12 +2055,8 @@ LIMIT 100",
|
||||
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
let (ext_name, ext_path) = remote_extensions.get_ext(
|
||||
library,
|
||||
true,
|
||||
&self.params.build_tag,
|
||||
&self.params.pgversion,
|
||||
)?;
|
||||
let (ext_name, ext_path) =
|
||||
remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?;
|
||||
download_tasks.push(self.download_extension(ext_name, ext_path));
|
||||
}
|
||||
let results = join_all(download_tasks).await;
|
||||
|
||||
@@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::compute::{BUILD_TAG, ComputeNode};
|
||||
use crate::http::JsonResponse;
|
||||
use crate::http::extract::{Path, Query};
|
||||
|
||||
@@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension(
|
||||
remote_extensions.get_ext(
|
||||
&filename,
|
||||
ext_server_params.is_library,
|
||||
&compute.params.build_tag,
|
||||
&BUILD_TAG,
|
||||
&compute.params.pgversion,
|
||||
)
|
||||
};
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use metrics::core::{AtomicF64, Collector, GenericGauge};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{
|
||||
IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec,
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
|
||||
register_int_gauge_vec, register_uint_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -70,8 +71,19 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy<GenericGauge<AtomicF64>> = Lazy::new(
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Report that `compute_ctl` is up and what's the current compute status.
|
||||
pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"compute_ctl_up",
|
||||
"Whether compute_ctl is running",
|
||||
&["build_tag", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = INSTALLED_EXTENSIONS.collect();
|
||||
let mut metrics = COMPUTE_CTL_UP.collect();
|
||||
metrics.extend(INSTALLED_EXTENSIONS.collect());
|
||||
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(DB_MIGRATION_FAILED.collect());
|
||||
|
||||
@@ -428,11 +428,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'l0_flush_delay_threshold' as an integer")?,
|
||||
l0_flush_wait_upload: settings
|
||||
.remove("l0_flush_wait_upload")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'l0_flush_wait_upload' as a boolean")?,
|
||||
l0_flush_stall_threshold: settings
|
||||
.remove("l0_flush_stall_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
|
||||
@@ -285,12 +285,6 @@ pub struct TenantConfigToml {
|
||||
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
|
||||
/// to avoid deadlock. 0 to disable. Disabled by default.
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
/// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next
|
||||
/// layer. This is a temporary backpressure mechanism which should be removed once
|
||||
/// l0_flush_{delay,stall}_threshold is fully enabled.
|
||||
///
|
||||
/// TODO: this is no longer enabled, remove it when the config option is no longer set.
|
||||
pub l0_flush_wait_upload: bool,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is #of bytes of WAL.
|
||||
@@ -579,8 +573,6 @@ pub mod tenant_conf_defaults {
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
|
||||
crate::models::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = false;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
@@ -627,7 +619,6 @@ impl Default for TenantConfigToml {
|
||||
compaction_l0_semaphore: DEFAULT_COMPACTION_L0_SEMAPHORE,
|
||||
l0_flush_delay_threshold: None,
|
||||
l0_flush_stall_threshold: None,
|
||||
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
|
||||
@@ -523,8 +523,6 @@ pub struct TenantConfigPatch {
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub l0_flush_stall_threshold: FieldPatch<usize>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub l0_flush_wait_upload: FieldPatch<bool>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_horizon: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_period: FieldPatch<String>,
|
||||
@@ -614,9 +612,6 @@ pub struct TenantConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_horizon: Option<u64>,
|
||||
|
||||
@@ -712,7 +707,6 @@ impl TenantConfig {
|
||||
mut compaction_l0_semaphore,
|
||||
mut l0_flush_delay_threshold,
|
||||
mut l0_flush_stall_threshold,
|
||||
mut l0_flush_wait_upload,
|
||||
mut gc_horizon,
|
||||
mut gc_period,
|
||||
mut image_creation_threshold,
|
||||
@@ -765,7 +759,6 @@ impl TenantConfig {
|
||||
patch
|
||||
.l0_flush_stall_threshold
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch
|
||||
.gc_period
|
||||
@@ -844,7 +837,6 @@ impl TenantConfig {
|
||||
compaction_l0_semaphore,
|
||||
l0_flush_delay_threshold,
|
||||
l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload,
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
image_creation_threshold,
|
||||
@@ -911,9 +903,6 @@ impl TenantConfig {
|
||||
l0_flush_stall_threshold: self
|
||||
.l0_flush_stall_threshold
|
||||
.or(global_conf.l0_flush_stall_threshold),
|
||||
l0_flush_wait_upload: self
|
||||
.l0_flush_wait_upload
|
||||
.unwrap_or(global_conf.l0_flush_wait_upload),
|
||||
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
|
||||
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
|
||||
image_creation_threshold: self
|
||||
|
||||
@@ -8,10 +8,9 @@ license = "MIT/Apache-2.0"
|
||||
bytes.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
futures-util = { workspace = true, features = ["sink"] }
|
||||
log = "0.4"
|
||||
tracing.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
phf = "0.11"
|
||||
postgres-protocol2 = { path = "../postgres-protocol2" }
|
||||
postgres-types2 = { path = "../postgres-types2" }
|
||||
tokio = { workspace = true, features = ["io-util", "time", "net"] }
|
||||
|
||||
@@ -6,13 +6,13 @@ use std::task::{Context, Poll};
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Sink, Stream, ready};
|
||||
use log::{info, trace};
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tracing::{info, trace};
|
||||
|
||||
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
|
||||
use crate::error::DbError;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -5,9 +5,9 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{TryStreamExt, pin_mut};
|
||||
use log::debug;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{CachedTypeInfo, InnerClient};
|
||||
use crate::codec::FrontendMessage;
|
||||
|
||||
@@ -7,11 +7,11 @@ use std::task::{Context, Poll};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Stream, ready};
|
||||
use log::{Level, debug, log_enabled};
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use postgres_types2::{Format, ToSql, Type};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{InnerClient, Responses};
|
||||
use crate::codec::FrontendMessage;
|
||||
@@ -36,7 +36,7 @@ where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let buf = if log_enabled!(Level::Debug) {
|
||||
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
let params = params.into_iter().collect::<Vec<_>>();
|
||||
debug!(
|
||||
"executing statement {} with parameters: {:?}",
|
||||
|
||||
@@ -6,10 +6,10 @@ use std::task::{Context, Poll};
|
||||
use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Stream, ready};
|
||||
use log::debug;
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{InnerClient, Responses};
|
||||
use crate::codec::FrontendMessage;
|
||||
|
||||
@@ -1133,6 +1133,40 @@ components:
|
||||
applied_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
safekeepers:
|
||||
$ref: "#/components/schemas/TimelineSafekeepersInfo"
|
||||
|
||||
TimelineSafekeepersInfo:
|
||||
type: object
|
||||
required:
|
||||
- tenant_id
|
||||
- timeline_id
|
||||
- generation
|
||||
- safekeepers
|
||||
properties:
|
||||
tenant_id:
|
||||
type: string
|
||||
format: hex
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
generation:
|
||||
type: integer
|
||||
safekeepers:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/TimelineSafekeeperInfo"
|
||||
|
||||
TimelineSafekeeperInfo:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
- hostname
|
||||
properties:
|
||||
id:
|
||||
type: integer
|
||||
hostname:
|
||||
type: string
|
||||
|
||||
SyntheticSizeResponse:
|
||||
type: object
|
||||
|
||||
@@ -2256,7 +2256,6 @@ async fn timeline_compact_handler(
|
||||
let state = get_state(&request);
|
||||
|
||||
let mut flags = EnumSet::empty();
|
||||
flags |= CompactFlags::NoYield; // run compaction to completion
|
||||
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
|
||||
flags |= CompactFlags::ForceL0Compaction;
|
||||
@@ -2417,7 +2416,6 @@ async fn timeline_checkpoint_handler(
|
||||
let state = get_state(&request);
|
||||
|
||||
let mut flags = EnumSet::empty();
|
||||
flags |= CompactFlags::NoYield; // run compaction to completion
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
|
||||
flags |= CompactFlags::ForceL0Compaction;
|
||||
}
|
||||
@@ -3776,7 +3774,7 @@ pub fn make_router(
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|
||||
|r| testing_api_handler("mark timeline invisible", r, timeline_mark_invisible_handler),
|
||||
|r| api_handler( r, timeline_mark_invisible_handler),
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
|
||||
use enum_map::{Enum as _, EnumMap};
|
||||
use futures::Future;
|
||||
use metrics::{
|
||||
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
@@ -499,15 +499,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
|
||||
register_gauge_vec!(
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
"Time spent waiting for preceding uploads during layer flush",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -2864,7 +2855,6 @@ pub(crate) struct TimelineMetrics {
|
||||
timeline_id: String,
|
||||
pub flush_time_histo: StorageTimeMetrics,
|
||||
pub flush_delay_histo: StorageTimeMetrics,
|
||||
pub flush_wait_upload_time_gauge: Gauge,
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
pub logical_size_histo: StorageTimeMetrics,
|
||||
@@ -2916,9 +2906,6 @@ impl TimelineMetrics {
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let compact_time_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::Compact,
|
||||
&tenant_id,
|
||||
@@ -3046,7 +3033,6 @@ impl TimelineMetrics {
|
||||
timeline_id,
|
||||
flush_time_histo,
|
||||
flush_delay_histo,
|
||||
flush_wait_upload_time_gauge,
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
logical_size_histo,
|
||||
@@ -3096,14 +3082,6 @@ impl TimelineMetrics {
|
||||
self.resident_physical_size_gauge.get()
|
||||
}
|
||||
|
||||
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
|
||||
self.flush_wait_upload_time_gauge.add(duration);
|
||||
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
.unwrap()
|
||||
.add(duration);
|
||||
}
|
||||
|
||||
/// Generates TIMELINE_LAYER labels for a persistent layer.
|
||||
fn make_layer_labels(&self, layer_desc: &PersistentLayerDesc) -> [&str; 5] {
|
||||
let level = match LayerMap::is_l0(&layer_desc.key_range, layer_desc.is_delta()) {
|
||||
@@ -3207,7 +3185,6 @@ impl TimelineMetrics {
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
|
||||
// warn to catch these in tests; there shouldn't be any
|
||||
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
|
||||
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// allow some time to elapse before logging to cut down the number of log
|
||||
// lines.
|
||||
info!("waiting for task {} to shut down", task.name);
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
let _ = join_handle.await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
_ = &mut join_handle => break,
|
||||
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
|
||||
}
|
||||
}
|
||||
info!("task {} completed", task.name);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -3080,6 +3080,7 @@ impl Tenant {
|
||||
let mut has_pending_l0 = false;
|
||||
for timeline in compact_l0 {
|
||||
let ctx = &ctx.with_scope_timeline(&timeline);
|
||||
// NB: don't set CompactFlags::YieldForL0, since this is an L0-only compaction pass.
|
||||
let outcome = timeline
|
||||
.compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx)
|
||||
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
|
||||
@@ -3097,14 +3098,9 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated
|
||||
// more L0 layers, they may also be compacted here.
|
||||
//
|
||||
// NB: image compaction may yield if there is pending L0 compaction.
|
||||
//
|
||||
// TODO: it will only yield if there is pending L0 compaction on the same timeline. If a
|
||||
// different timeline needs compaction, it won't. It should check `l0_compaction_trigger`.
|
||||
// We leave this for a later PR.
|
||||
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated more
|
||||
// L0 layers, they may also be compacted here. Image compaction will yield if there is
|
||||
// pending L0 compaction on any tenant timeline.
|
||||
//
|
||||
// TODO: consider ordering timelines by some priority, e.g. time since last full compaction,
|
||||
// amount of L1 delta debt or garbage, offload-eligible timelines first, etc.
|
||||
@@ -3115,8 +3111,14 @@ impl Tenant {
|
||||
}
|
||||
let ctx = &ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// Yield for L0 if the separate L0 pass is enabled (otherwise there's no point).
|
||||
let mut flags = EnumSet::default();
|
||||
if self.get_compaction_l0_first() {
|
||||
flags |= CompactFlags::YieldForL0;
|
||||
}
|
||||
|
||||
let mut outcome = timeline
|
||||
.compact(cancel, EnumSet::default(), ctx)
|
||||
.compact(cancel, flags, ctx)
|
||||
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
|
||||
.await
|
||||
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
|
||||
@@ -6516,11 +6518,7 @@ mod tests {
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(
|
||||
&CancellationToken::new(),
|
||||
CompactFlags::NoYield.into(),
|
||||
&ctx,
|
||||
)
|
||||
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
|
||||
.await?;
|
||||
|
||||
let mut writer = tline.writer().await;
|
||||
@@ -6537,11 +6535,7 @@ mod tests {
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(
|
||||
&CancellationToken::new(),
|
||||
CompactFlags::NoYield.into(),
|
||||
&ctx,
|
||||
)
|
||||
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
|
||||
.await?;
|
||||
|
||||
let mut writer = tline.writer().await;
|
||||
@@ -6558,11 +6552,7 @@ mod tests {
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(
|
||||
&CancellationToken::new(),
|
||||
CompactFlags::NoYield.into(),
|
||||
&ctx,
|
||||
)
|
||||
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
|
||||
.await?;
|
||||
|
||||
let mut writer = tline.writer().await;
|
||||
@@ -6579,11 +6569,7 @@ mod tests {
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(
|
||||
&CancellationToken::new(),
|
||||
CompactFlags::NoYield.into(),
|
||||
&ctx,
|
||||
)
|
||||
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
@@ -6666,9 +6652,7 @@ mod tests {
|
||||
timeline.freeze_and_flush().await?;
|
||||
if compact {
|
||||
// this requires timeline to be &Arc<Timeline>
|
||||
timeline
|
||||
.compact(&cancel, CompactFlags::NoYield.into(), ctx)
|
||||
.await?;
|
||||
timeline.compact(&cancel, EnumSet::default(), ctx).await?;
|
||||
}
|
||||
|
||||
// this doesn't really need to use the timeline_id target, but it is closer to what it
|
||||
@@ -6995,7 +6979,6 @@ mod tests {
|
||||
child_timeline.freeze_and_flush().await?;
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::NoYield);
|
||||
child_timeline
|
||||
.compact(&CancellationToken::new(), flags, &ctx)
|
||||
.await?;
|
||||
@@ -7374,9 +7357,7 @@ mod tests {
|
||||
|
||||
// Perform a cycle of flush, compact, and GC
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
|
||||
.await?;
|
||||
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
|
||||
tenant
|
||||
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
|
||||
.await?;
|
||||
@@ -7705,7 +7686,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::NoYield);
|
||||
flags
|
||||
} else {
|
||||
EnumSet::empty()
|
||||
@@ -7756,9 +7736,7 @@ mod tests {
|
||||
let before_num_l0_delta_files =
|
||||
tline.layers.read().await.layer_map()?.level0_deltas().len();
|
||||
|
||||
tline
|
||||
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
|
||||
.await?;
|
||||
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
|
||||
|
||||
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
|
||||
|
||||
@@ -7923,7 +7901,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::NoYield);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -8386,7 +8363,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::NoYield);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -8454,7 +8430,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::NoYield);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -11551,4 +11526,255 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
|
||||
use pageserver_api::models::TimelineVisibilityState;
|
||||
|
||||
use crate::tenant::size::gather_inputs;
|
||||
|
||||
let tenant_conf = pageserver_api::models::TenantConfig {
|
||||
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
|
||||
pitr_interval: Some(Duration::ZERO),
|
||||
..Default::default()
|
||||
};
|
||||
let harness = TenantHarness::create_custom(
|
||||
"test_synthetic_size_calculation_with_invisible_branches",
|
||||
tenant_conf,
|
||||
TenantId::generate(),
|
||||
ShardIdentity::unsharded(),
|
||||
Generation::new(0xdeadbeef),
|
||||
)
|
||||
.await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let main_tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x100),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot1,
|
||||
Some(Lsn(0x20)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot2,
|
||||
Some(Lsn(0x30)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot3,
|
||||
Some(Lsn(0x40)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let limit = Arc::new(Semaphore::new(1));
|
||||
let max_retention_period = None;
|
||||
let mut logical_size_cache = HashMap::new();
|
||||
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let inputs = gather_inputs(
|
||||
&tenant,
|
||||
&limit,
|
||||
max_retention_period,
|
||||
&mut logical_size_cache,
|
||||
cause,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"gather_inputs",
|
||||
tenant_id = "unknown",
|
||||
shard_id = "unknown",
|
||||
))
|
||||
.await?;
|
||||
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
|
||||
use LsnKind::*;
|
||||
use tenant_size_model::Segment;
|
||||
let ModelInputs { mut segments, .. } = inputs;
|
||||
segments.retain(|s| s.timeline_id == TIMELINE_ID);
|
||||
for segment in segments.iter_mut() {
|
||||
segment.segment.parent = None; // We don't care about the parent for the test
|
||||
segment.segment.size = None; // We don't care about the size for the test
|
||||
}
|
||||
assert_eq!(
|
||||
segments,
|
||||
[
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x10,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchStart,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x20,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x30,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x100,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: GcCutOff,
|
||||
}, // we need to retain everything above the last branch point
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x100,
|
||||
size: None,
|
||||
needed: true,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchEnd,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
main_tline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_invisible_state(
|
||||
TimelineVisibilityState::Invisible,
|
||||
)?;
|
||||
main_tline.remote_client.wait_completion().await?;
|
||||
let inputs = gather_inputs(
|
||||
&tenant,
|
||||
&limit,
|
||||
max_retention_period,
|
||||
&mut logical_size_cache,
|
||||
cause,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"gather_inputs",
|
||||
tenant_id = "unknown",
|
||||
shard_id = "unknown",
|
||||
))
|
||||
.await?;
|
||||
let ModelInputs { mut segments, .. } = inputs;
|
||||
segments.retain(|s| s.timeline_id == TIMELINE_ID);
|
||||
for segment in segments.iter_mut() {
|
||||
segment.segment.parent = None; // We don't care about the parent for the test
|
||||
segment.segment.size = None; // We don't care about the size for the test
|
||||
}
|
||||
assert_eq!(
|
||||
segments,
|
||||
[
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x10,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchStart,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x20,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x30,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40, // Branch end LSN == last branch point LSN
|
||||
size: None,
|
||||
needed: true,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchEnd,
|
||||
},
|
||||
]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1968,9 +1968,7 @@ impl RemoteTimelineClient {
|
||||
/// Pick next tasks from the queue, and start as many of them as possible without violating
|
||||
/// the ordering constraints.
|
||||
///
|
||||
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
|
||||
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
|
||||
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
|
||||
/// The number of inprogress tasks is limited by `Self::inprogress_tasks`, see `next_ready`.
|
||||
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
|
||||
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
|
||||
debug!("starting op: {next_op}");
|
||||
@@ -2218,6 +2216,11 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
res
|
||||
}
|
||||
// TODO: this should wait for the deletion to be executed by the deletion queue.
|
||||
// Otherwise, the deletion may race with an upload and wrongfully delete a newer
|
||||
// file. Some of the above logic attempts to work around this, it should be replaced
|
||||
// by the upload queue ordering guarantees (see `can_bypass`). See:
|
||||
// <https://github.com/neondatabase/neon/issues/10283>.
|
||||
UploadOp::Delete(delete) => {
|
||||
if self.config.read().unwrap().block_deletions {
|
||||
let mut queue_locked = self.upload_queue.lock().unwrap();
|
||||
|
||||
@@ -33,7 +33,7 @@ pub struct ModelInputs {
|
||||
}
|
||||
|
||||
/// A [`Segment`], with some extra information for display purposes
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
|
||||
pub struct SegmentMeta {
|
||||
pub segment: Segment,
|
||||
pub timeline_id: TimelineId,
|
||||
@@ -248,6 +248,8 @@ pub(super) async fn gather_inputs(
|
||||
None
|
||||
};
|
||||
|
||||
let branch_is_invisible = timeline.is_invisible() == Some(true);
|
||||
|
||||
let lease_points = gc_info
|
||||
.leases
|
||||
.keys()
|
||||
@@ -271,7 +273,10 @@ pub(super) async fn gather_inputs(
|
||||
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
if !branch_is_invisible {
|
||||
// Do not count lease points for invisible branches.
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
}
|
||||
|
||||
drop(gc_info);
|
||||
|
||||
@@ -287,7 +292,9 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
// Add a point for the PITR cutoff
|
||||
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
|
||||
if !branch_start_needed {
|
||||
if !branch_start_needed && !branch_is_invisible {
|
||||
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
|
||||
// range from the last branch point to the latest data.
|
||||
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
|
||||
}
|
||||
|
||||
@@ -373,11 +380,19 @@ pub(super) async fn gather_inputs(
|
||||
}
|
||||
}
|
||||
|
||||
let branch_end_lsn = if branch_is_invisible {
|
||||
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
|
||||
segments.last().unwrap().segment.lsn
|
||||
} else {
|
||||
// Otherwise, the branch end is the last record LSN.
|
||||
last_record_lsn.0
|
||||
};
|
||||
|
||||
// Current end of the timeline
|
||||
segments.push(SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: Some(parent),
|
||||
lsn: last_record_lsn.0,
|
||||
lsn: branch_end_lsn,
|
||||
size: None, // Filled in later, if necessary
|
||||
needed: true,
|
||||
},
|
||||
@@ -609,6 +624,7 @@ async fn calculate_logical_size(
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[test]
|
||||
fn verify_size_for_multiple_branches() {
|
||||
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
|
||||
@@ -766,6 +782,7 @@ fn verify_size_for_multiple_branches() {
|
||||
assert_eq!(inputs.calculate(), 37_851_408);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[test]
|
||||
fn verify_size_for_one_branch() {
|
||||
let doc = r#"
|
||||
|
||||
@@ -84,8 +84,8 @@ use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::remote_timeline_client::index::{GcCompactionState, IndexPart};
|
||||
use super::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
|
||||
use super::secondary::heatmap::HeatMapLayer;
|
||||
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
|
||||
use super::tasks::log_compaction_error;
|
||||
@@ -870,9 +870,14 @@ pub(crate) enum CompactFlags {
|
||||
OnlyL0Compaction,
|
||||
EnhancedGcBottomMostCompaction,
|
||||
DryRun,
|
||||
/// Disables compaction yielding e.g. due to high L0 count. This is set e.g. when requesting
|
||||
/// compaction via HTTP API.
|
||||
NoYield,
|
||||
/// Makes image compaction yield if there's pending L0 compaction. This should always be used in
|
||||
/// the background compaction task, since we want to aggressively compact down L0 to bound
|
||||
/// read amplification.
|
||||
///
|
||||
/// It only makes sense to use this when `compaction_l0_first` is enabled (such that we yield to
|
||||
/// an L0 compaction pass), and without `OnlyL0Compaction` (L0 compaction shouldn't yield for L0
|
||||
/// compaction).
|
||||
YieldForL0,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
@@ -1891,18 +1896,19 @@ impl Timeline {
|
||||
// out by other background tasks (including image compaction). We request this via
|
||||
// `BackgroundLoopKind::L0Compaction`.
|
||||
//
|
||||
// If this is a regular compaction pass, and L0-only compaction is enabled in the config,
|
||||
// then we should yield for immediate L0 compaction if necessary while we're waiting for the
|
||||
// background task semaphore. There's no point yielding otherwise, since we'd just end up
|
||||
// right back here.
|
||||
// Yield for pending L0 compaction while waiting for the semaphore.
|
||||
let is_l0_only = options.flags.contains(CompactFlags::OnlyL0Compaction);
|
||||
let semaphore_kind = match is_l0_only && self.get_compaction_l0_semaphore() {
|
||||
true => BackgroundLoopKind::L0Compaction,
|
||||
false => BackgroundLoopKind::Compaction,
|
||||
};
|
||||
let yield_for_l0 = !is_l0_only
|
||||
&& self.get_compaction_l0_first()
|
||||
&& !options.flags.contains(CompactFlags::NoYield);
|
||||
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
|
||||
if yield_for_l0 {
|
||||
// If this is an L0 pass, it doesn't make sense to yield for L0.
|
||||
debug_assert!(!is_l0_only, "YieldForL0 during L0 pass");
|
||||
// If `compaction_l0_first` is disabled, there's no point yielding.
|
||||
debug_assert!(self.get_compaction_l0_first(), "YieldForL0 without L0 pass");
|
||||
}
|
||||
|
||||
let acquire = async move {
|
||||
let guard = self.compaction_lock.lock().await;
|
||||
@@ -2209,6 +2215,10 @@ impl Timeline {
|
||||
self.remote_client.is_archived()
|
||||
}
|
||||
|
||||
pub(crate) fn is_invisible(&self) -> Option<bool> {
|
||||
self.remote_client.is_invisible()
|
||||
}
|
||||
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
self.current_state() == TimelineState::Stopping
|
||||
}
|
||||
@@ -2562,14 +2572,6 @@ impl Timeline {
|
||||
Some(max(l0_flush_stall_threshold, compaction_threshold))
|
||||
}
|
||||
|
||||
fn get_l0_flush_wait_upload(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.l0_flush_wait_upload
|
||||
.unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload)
|
||||
}
|
||||
|
||||
fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -4591,27 +4593,6 @@ impl Timeline {
|
||||
// release lock on 'layers'
|
||||
};
|
||||
|
||||
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote
|
||||
// TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead.
|
||||
if self.get_l0_flush_wait_upload() {
|
||||
let start = Instant::now();
|
||||
self.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
WaitCompletionError::UploadQueueShutDownOrStopped
|
||||
| WaitCompletionError::NotInitialized(
|
||||
NotInitialized::ShuttingDown | NotInitialized::Stopped,
|
||||
) => FlushLayerError::Cancelled,
|
||||
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
|
||||
FlushLayerError::Other(anyhow!(e).into())
|
||||
}
|
||||
})?;
|
||||
let duration = start.elapsed().as_secs_f64();
|
||||
self.metrics.flush_wait_upload_time_gauge_add(duration);
|
||||
}
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
|
||||
|
||||
@@ -394,8 +394,8 @@ impl GcCompactionQueue {
|
||||
if job.dry_run {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
if options.flags.contains(CompactFlags::NoYield) {
|
||||
flags |= CompactFlags::NoYield;
|
||||
if options.flags.contains(CompactFlags::YieldForL0) {
|
||||
flags |= CompactFlags::YieldForL0;
|
||||
}
|
||||
let options = CompactOptions {
|
||||
flags,
|
||||
@@ -983,7 +983,7 @@ impl Timeline {
|
||||
|
||||
// Yield if we have pending L0 compaction. The scheduler will do another pass.
|
||||
if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
|
||||
&& !options.flags.contains(CompactFlags::NoYield)
|
||||
&& options.flags.contains(CompactFlags::YieldForL0)
|
||||
{
|
||||
info!("image/ancestor compaction yielding for L0 compaction");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
@@ -1028,7 +1028,7 @@ impl Timeline {
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
!options.flags.contains(CompactFlags::NoYield),
|
||||
options.flags.contains(CompactFlags::YieldForL0),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
@@ -2635,7 +2635,7 @@ impl Timeline {
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
let sub_compaction = options.sub_compaction;
|
||||
let job = GcCompactJob::from_compact_options(options.clone());
|
||||
let no_yield = options.flags.contains(CompactFlags::NoYield);
|
||||
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
|
||||
if sub_compaction {
|
||||
info!(
|
||||
"running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
@@ -2650,7 +2650,7 @@ impl Timeline {
|
||||
idx + 1,
|
||||
jobs_len
|
||||
);
|
||||
self.compact_with_gc_inner(cancel, job, ctx, no_yield)
|
||||
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
|
||||
.await?;
|
||||
}
|
||||
if jobs_len == 0 {
|
||||
@@ -2658,7 +2658,8 @@ impl Timeline {
|
||||
}
|
||||
return Ok(CompactionOutcome::Done);
|
||||
}
|
||||
self.compact_with_gc_inner(cancel, job, ctx, no_yield).await
|
||||
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn compact_with_gc_inner(
|
||||
@@ -2666,7 +2667,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
job: GcCompactJob,
|
||||
ctx: &RequestContext,
|
||||
no_yield: bool,
|
||||
yield_for_l0: bool,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
@@ -2936,18 +2937,15 @@ impl Timeline {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
if !no_yield {
|
||||
let should_yield = self
|
||||
let should_yield = yield_for_l0
|
||||
&& self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!(
|
||||
"preempt gc-compaction when downloading layers: too many L0 layers"
|
||||
);
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
let resident_layer = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
@@ -3081,21 +3079,17 @@ impl Timeline {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
if !no_yield {
|
||||
keys_processed += 1;
|
||||
if keys_processed % 1000 == 0 {
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!(
|
||||
"preempt gc-compaction in the main loop: too many L0 layers"
|
||||
);
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
}
|
||||
keys_processed += 1;
|
||||
let should_yield = yield_for_l0
|
||||
&& keys_processed % 1000 == 0
|
||||
&& self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
// If this shard does not need to store this key, simply skip it.
|
||||
|
||||
@@ -235,7 +235,7 @@ pub(super) async fn prepare(
|
||||
return Err(NoAncestor);
|
||||
}
|
||||
|
||||
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
|
||||
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
|
||||
|
||||
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
|
||||
// If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
|
||||
@@ -249,7 +249,13 @@ pub(super) async fn prepare(
|
||||
ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
|
||||
ancestor = ancestor_of_ancestor;
|
||||
// TODO: do we still need to check if we don't want to reparent?
|
||||
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
|
||||
check_no_archived_children_of_ancestor(
|
||||
tenant,
|
||||
detached,
|
||||
&ancestor,
|
||||
ancestor_lsn,
|
||||
behavior,
|
||||
)?;
|
||||
}
|
||||
} else if ancestor.ancestor_timeline.is_some() {
|
||||
// non-technical requirement; we could flatten N ancestors just as easily but we chose
|
||||
@@ -1156,31 +1162,44 @@ fn check_no_archived_children_of_ancestor(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
detach_behavior: DetachBehavior,
|
||||
) -> Result<(), Error> {
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
|
||||
if timeline.is_archived() == Some(true) {
|
||||
return Err(Error::Archived(timeline.timeline_id));
|
||||
}
|
||||
}
|
||||
for timeline_offloaded in timelines_offloaded.values() {
|
||||
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
|
||||
continue;
|
||||
}
|
||||
// This forbids the detach ancestor feature if flattened timelines are present,
|
||||
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
|
||||
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
|
||||
// This is a bit unfortunate, but as of writing this we don't support flattening
|
||||
// anyway. Maybe we can evolve the data model in the future.
|
||||
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
|
||||
let is_earlier = retain_lsn <= ancestor_lsn;
|
||||
if !is_earlier {
|
||||
continue;
|
||||
match detach_behavior {
|
||||
DetachBehavior::NoAncestorAndReparent => {
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
|
||||
for timeline in
|
||||
reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
|
||||
{
|
||||
if timeline.is_archived() == Some(true) {
|
||||
return Err(Error::Archived(timeline.timeline_id));
|
||||
}
|
||||
}
|
||||
|
||||
for timeline_offloaded in timelines_offloaded.values() {
|
||||
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
|
||||
continue;
|
||||
}
|
||||
// This forbids the detach ancestor feature if flattened timelines are present,
|
||||
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
|
||||
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
|
||||
// This is a bit unfortunate, but as of writing this we don't support flattening
|
||||
// anyway. Maybe we can evolve the data model in the future.
|
||||
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
|
||||
let is_earlier = retain_lsn <= ancestor_lsn;
|
||||
if !is_earlier {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return Err(Error::Archived(timeline_offloaded.timeline_id));
|
||||
}
|
||||
}
|
||||
return Err(Error::Archived(timeline_offloaded.timeline_id));
|
||||
DetachBehavior::MultiLevelAndNoReparent => {
|
||||
// We don't need to check anything if the user requested to not reparent.
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -647,18 +647,25 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return found;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
static PGIOAlignedBlock voidblock = {0};
|
||||
#else
|
||||
static PGAlignedBlock voidblock = {0};
|
||||
#endif
|
||||
#define SCRIBBLEPAGE (&voidblock.data)
|
||||
|
||||
/*
|
||||
* Try to read pages from local cache.
|
||||
* Returns the number of pages read from the local cache, and sets bits in
|
||||
* 'read' for the pages which were read. This may scribble over buffers not
|
||||
* marked in 'read', so be careful with operation ordering.
|
||||
* 'mask' for the pages which were read. This may scribble over buffers not
|
||||
* marked in 'mask', so be careful with operation ordering.
|
||||
*
|
||||
* In case of error local file cache is disabled (lfc->limit is set to zero),
|
||||
* and -1 is returned. Note that 'read' and the buffers may be touched and in
|
||||
* an otherwise invalid state.
|
||||
* and -1 is returned.
|
||||
*
|
||||
* If the mask argument is supplied, bits will be set at the offsets of pages
|
||||
* that were present and read from the LFC.
|
||||
* If the mask argument is supplied, we'll only try to read those pages which
|
||||
* don't have their bits set on entry. At exit, pages which were successfully
|
||||
* read from LFC will have their bits set.
|
||||
*/
|
||||
int
|
||||
lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
@@ -693,23 +700,43 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
while (nblocks > 0)
|
||||
{
|
||||
struct iovec iov[PG_IOV_MAX];
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
|
||||
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
|
||||
int iteration_hits = 0;
|
||||
int iteration_misses = 0;
|
||||
uint64 io_time_us = 0;
|
||||
int n_blocks_to_read = 0;
|
||||
int n_blocks_to_read = 0;
|
||||
int iov_last_used = 0;
|
||||
int first_block_in_chunk_read = -1;
|
||||
ConditionVariable* cv;
|
||||
|
||||
Assert(blocks_in_chunk > 0);
|
||||
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
|
||||
iov[i].iov_base = buffers[buf_offset + i];
|
||||
iov[i].iov_len = BLCKSZ;
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
/* mask not set = we must do work */
|
||||
if (!BITMAP_ISSET(mask, buf_offset + i))
|
||||
{
|
||||
iov[i].iov_base = buffers[buf_offset + i];
|
||||
n_blocks_to_read++;
|
||||
iov_last_used = i + 1;
|
||||
|
||||
if (first_block_in_chunk_read == -1)
|
||||
{
|
||||
first_block_in_chunk_read = i;
|
||||
}
|
||||
}
|
||||
/* mask set = we must do no work */
|
||||
else
|
||||
{
|
||||
/* don't scribble on pages we weren't requested to write to */
|
||||
iov[i].iov_base = SCRIBBLEPAGE;
|
||||
}
|
||||
}
|
||||
|
||||
/* shortcut IO */
|
||||
if (n_blocks_to_read == 0)
|
||||
{
|
||||
buf_offset += blocks_in_chunk;
|
||||
@@ -718,6 +745,12 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* The effective iov size must be >= the number of blocks we're about
|
||||
* to read.
|
||||
*/
|
||||
Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read);
|
||||
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
cv = &lfc_ctl->cv[hash % N_COND_VARS];
|
||||
@@ -762,10 +795,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
FileCacheBlockState state = UNAVAILABLE;
|
||||
bool sleeping = false;
|
||||
|
||||
/* no need to work on something we're not interested in */
|
||||
if (BITMAP_ISSET(mask, buf_offset + i))
|
||||
continue;
|
||||
|
||||
while (lfc_ctl->generation == generation)
|
||||
{
|
||||
state = GET_STATE(entry, chunk_offs + i);
|
||||
@@ -789,7 +827,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
if (state == AVAILABLE)
|
||||
{
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
BITMAP_SET(chunk_mask, i);
|
||||
iteration_hits++;
|
||||
}
|
||||
else
|
||||
@@ -801,16 +839,34 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (iteration_hits != 0)
|
||||
{
|
||||
/* chunk offset (# of pages) into the LFC file */
|
||||
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
|
||||
int nwrite = iov_last_used - first_block_in_chunk_read;
|
||||
/* offset of first IOV */
|
||||
first_read_offset += chunk_offs + first_block_in_chunk_read;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
|
||||
/* Read only the blocks we're interested in, limiting */
|
||||
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
|
||||
nwrite, first_read_offset * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * blocks_in_chunk))
|
||||
if (rc != (BLCKSZ * nwrite))
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* We successfully read the pages we know were valid when we
|
||||
* started reading; now mark those pages as read
|
||||
*/
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
if (BITMAP_ISSET(chunk_mask, i))
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
}
|
||||
}
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
|
||||
@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
|
||||
NeonResponse *resp;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
/* read response */
|
||||
int rc;
|
||||
int rc;
|
||||
|
||||
if (shard->state != PS_Connected)
|
||||
return NULL;
|
||||
|
||||
Assert(pageserver_conn);
|
||||
|
||||
while (true)
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
{
|
||||
if (PQisBusy(shard->conn))
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
|
||||
WAIT_EVENT_NEON_PS_READ) != 1
|
||||
|| (event.events & WL_SOCKET_READABLE) == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == 0)
|
||||
return NULL;
|
||||
else if (rc > 0)
|
||||
|
||||
@@ -315,7 +315,7 @@ static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
void *buffer)
|
||||
{
|
||||
bits8 rv = 1;
|
||||
bits8 rv = 0;
|
||||
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ static char *hexdump_page(char *page);
|
||||
|
||||
#define IS_LOCAL_REL(reln) (\
|
||||
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
|
||||
NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \
|
||||
NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \
|
||||
)
|
||||
|
||||
const int SmgrTrace = DEBUG5;
|
||||
@@ -1081,6 +1081,9 @@ prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_r
|
||||
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to calculate the LSNs to send.
|
||||
*
|
||||
* Bits set in *mask (if present) indicate pages already read; i.e. pages we
|
||||
* can skip in this process.
|
||||
*
|
||||
* When performing a prefetch rather than a synchronous request,
|
||||
* is_prefetch==true. Currently, it only affects how the request is accounted
|
||||
* in the perf counters.
|
||||
@@ -1126,7 +1129,7 @@ Retry:
|
||||
uint64 ring_index;
|
||||
neon_request_lsns *lsns;
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
if (frlsns)
|
||||
@@ -2381,7 +2384,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
LSN_FORMAT_ARGS(last_written_lsn),
|
||||
LSN_FORMAT_ARGS(flushlsn));
|
||||
XLogFlush(last_written_lsn);
|
||||
flushlsn = last_written_lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2397,18 +2399,35 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* requesting the latest page, by setting request LSN to
|
||||
* UINT64_MAX.
|
||||
*
|
||||
* Remember the current LSN, however, so that we can later
|
||||
* correctly determine if the response to the request is still
|
||||
* valid. The most up-to-date LSN we could use for that purpose
|
||||
* would be the current insert LSN, but to avoid the overhead of
|
||||
* looking it up, use 'flushlsn' instead. This relies on the
|
||||
* assumption that if the page was modified since the last WAL
|
||||
* flush, it should still be in the buffer cache, and we
|
||||
* wouldn't be requesting it.
|
||||
* effective_request_lsn is used to check that received response is still valid.
|
||||
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
|
||||
* but it is not correct. Consider the following scenario:
|
||||
* 1. Backend A wants to prefetch block X
|
||||
* 2. Backend A checks that block X is not present in the shared buffer cache
|
||||
* 3. Backend A calls prefetch_do_request, which calls neon_get_request_lsns
|
||||
* 4. neon_get_request_lsns obtains LwLSN=11 for the block
|
||||
* 5. Backend B downloads block X, updates and wallogs it with LSN=13
|
||||
* 6. Block X is once again evicted from shared buffers, its LwLSN is set to LSN=13
|
||||
* 7. Backend A is still executing in neon_get_request_lsns(). It calls 'flushlsn = GetFlushRecPtr();'.
|
||||
* Let's say that it is LSN=14
|
||||
* 8. Backend A uses LSN=14 as effective_lsn in the prefetch slot. The request stored in the slot is
|
||||
* [not_modified_since=11, effective_request_lsn=14]
|
||||
* 9. Backend A sends the prefetch request, pageserver processes it, and sends response.
|
||||
* The last LSN that the pageserver had processed was LSN=12, so the page image in the response is valid at LSN=12.
|
||||
* 10. Backend A calls smgrread() for page X with LwLSN=13
|
||||
* 11. Backend A finds in prefetch ring the response for the prefetch request with [not_modified_since=11, effective_lsn=Lsn14],
|
||||
* so it satisfies neon_prefetch_response_usable condition.
|
||||
*
|
||||
* Things go wrong in step 7-8, when [not_modified_since=11, effective_request_lsn=14] is determined for the request.
|
||||
* That is incorrect, because the page has in fact been modified at LSN=13. The invariant is that for any request,
|
||||
* there should not be any modifications to a page between its not_modified_since and (effective_)request_lsn values.
|
||||
*
|
||||
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
|
||||
* But you can't do that within smgrprefetch(), would need to modify the caller.
|
||||
*/
|
||||
result->request_lsn = UINT64_MAX;
|
||||
result->not_modified_since = last_written_lsn;
|
||||
result->effective_request_lsn = flushlsn;
|
||||
result->effective_request_lsn = last_written_lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2467,11 +2486,8 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
|
||||
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
|
||||
* we remember `effective_request_lsn` separately. In a primary,
|
||||
* `effective_request_lsn` is the last flush WAL position when the request
|
||||
* was sent to the pageserver. That's logically the LSN that we are
|
||||
* requesting the page at, but we send UINT64_MAX to the pageserver so
|
||||
* that if the GC horizon advances past that position, we still get a
|
||||
* valid response instead of an error.
|
||||
* `effective_request_lsn` is the same as `not_modified_since`.
|
||||
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
|
||||
*
|
||||
* To determine whether a response to a GetPage request issued earlier is
|
||||
* still valid to satisfy a new page read, we look at the
|
||||
@@ -3026,9 +3042,6 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
@@ -3134,6 +3147,15 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Read N pages at a specific LSN.
|
||||
*
|
||||
* *mask is set for pages read at a previous point in time, and which we
|
||||
* should not touch, nor overwrite.
|
||||
* New bits should be set in *mask for the pages we'successfully read.
|
||||
*
|
||||
* The offsets in request_lsns, buffers, and mask are linked.
|
||||
*/
|
||||
static void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
@@ -3186,7 +3208,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block
|
||||
neon_request_lsns *reqlsns = &request_lsns[i];
|
||||
TimestampTz start_ts, end_ts;
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
start_ts = GetCurrentTimestamp();
|
||||
@@ -3485,9 +3507,7 @@ static void
|
||||
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_hits[PG_IOV_MAX / 8];
|
||||
bits8 read[PG_IOV_MAX / 8];
|
||||
bits8 read_pages[PG_IOV_MAX / 8];
|
||||
neon_request_lsns request_lsns[PG_IOV_MAX];
|
||||
int lfc_result;
|
||||
int prefetch_result;
|
||||
@@ -3519,19 +3539,18 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
memset(read_pages, 0, sizeof(read_pages));
|
||||
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
|
||||
blocknum, request_lsns, nblocks,
|
||||
buffers, read_pages);
|
||||
|
||||
if (prefetch_result == nblocks)
|
||||
return;
|
||||
|
||||
/* invert the result: exclude prefetched blocks */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_hits[i] = ~prefetch_hits[i];
|
||||
|
||||
/* Try to read from local file cache */
|
||||
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
|
||||
nblocks, lfc_hits);
|
||||
nblocks, read_pages);
|
||||
|
||||
if (lfc_result > 0)
|
||||
MyNeonCounters->file_cache_hits_total += lfc_result;
|
||||
@@ -3540,21 +3559,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
if (prefetch_result + lfc_result == nblocks)
|
||||
return;
|
||||
|
||||
if (lfc_result <= 0)
|
||||
{
|
||||
/* can't use the LFC result, so read all blocks from PS */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~prefetch_hits[i];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* invert the result: exclude blocks read from lfc */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
|
||||
}
|
||||
|
||||
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
|
||||
buffers, nblocks, read);
|
||||
buffers, nblocks, read_pages);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
|
||||
40
poetry.lock
generated
40
poetry.lock
generated
@@ -3111,30 +3111,30 @@ six = "*"
|
||||
|
||||
[[package]]
|
||||
name = "ruff"
|
||||
version = "0.7.0"
|
||||
version = "0.11.2"
|
||||
description = "An extremely fast Python linter and code formatter, written in Rust."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "ruff-0.7.0-py3-none-linux_armv6l.whl", hash = "sha256:0cdf20c2b6ff98e37df47b2b0bd3a34aaa155f59a11182c1303cce79be715628"},
|
||||
{file = "ruff-0.7.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:496494d350c7fdeb36ca4ef1c9f21d80d182423718782222c29b3e72b3512737"},
|
||||
{file = "ruff-0.7.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:214b88498684e20b6b2b8852c01d50f0651f3cc6118dfa113b4def9f14faaf06"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:630fce3fefe9844e91ea5bbf7ceadab4f9981f42b704fae011bb8efcaf5d84be"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:211d877674e9373d4bb0f1c80f97a0201c61bcd1e9d045b6e9726adc42c156aa"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:194d6c46c98c73949a106425ed40a576f52291c12bc21399eb8f13a0f7073495"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:82c2579b82b9973a110fab281860403b397c08c403de92de19568f32f7178598"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9af971fe85dcd5eaed8f585ddbc6bdbe8c217fb8fcf510ea6bca5bdfff56040e"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b641c7f16939b7d24b7bfc0be4102c56562a18281f84f635604e8a6989948914"},
|
||||
{file = "ruff-0.7.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d71672336e46b34e0c90a790afeac8a31954fd42872c1f6adaea1dff76fd44f9"},
|
||||
{file = "ruff-0.7.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ab7d98c7eed355166f367597e513a6c82408df4181a937628dbec79abb2a1fe4"},
|
||||
{file = "ruff-0.7.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:1eb54986f770f49edb14f71d33312d79e00e629a57387382200b1ef12d6a4ef9"},
|
||||
{file = "ruff-0.7.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:dc452ba6f2bb9cf8726a84aa877061a2462afe9ae0ea1d411c53d226661c601d"},
|
||||
{file = "ruff-0.7.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:4b406c2dce5be9bad59f2de26139a86017a517e6bcd2688da515481c05a2cb11"},
|
||||
{file = "ruff-0.7.0-py3-none-win32.whl", hash = "sha256:f6c968509f767776f524a8430426539587d5ec5c662f6addb6aa25bc2e8195ec"},
|
||||
{file = "ruff-0.7.0-py3-none-win_amd64.whl", hash = "sha256:ff4aabfbaaba880e85d394603b9e75d32b0693152e16fa659a3064a85df7fce2"},
|
||||
{file = "ruff-0.7.0-py3-none-win_arm64.whl", hash = "sha256:10842f69c245e78d6adec7e1db0a7d9ddc2fff0621d730e61657b64fa36f207e"},
|
||||
{file = "ruff-0.7.0.tar.gz", hash = "sha256:47a86360cf62d9cd53ebfb0b5eb0e882193fc191c6d717e8bef4462bc3b9ea2b"},
|
||||
{file = "ruff-0.11.2-py3-none-linux_armv6l.whl", hash = "sha256:c69e20ea49e973f3afec2c06376eb56045709f0212615c1adb0eda35e8a4e477"},
|
||||
{file = "ruff-0.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:2c5424cc1c4eb1d8ecabe6d4f1b70470b4f24a0c0171356290b1953ad8f0e272"},
|
||||
{file = "ruff-0.11.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:ecf20854cc73f42171eedb66f006a43d0a21bfb98a2523a809931cda569552d9"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0c543bf65d5d27240321604cee0633a70c6c25c9a2f2492efa9f6d4b8e4199bb"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:20967168cc21195db5830b9224be0e964cc9c8ecf3b5a9e3ce19876e8d3a96e3"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:955a9ce63483999d9f0b8f0b4a3ad669e53484232853054cc8b9d51ab4c5de74"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:86b3a27c38b8fce73bcd262b0de32e9a6801b76d52cdb3ae4c914515f0cef608"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a3b66a03b248c9fcd9d64d445bafdf1589326bee6fc5c8e92d7562e58883e30f"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0397c2672db015be5aa3d4dac54c69aa012429097ff219392c018e21f5085147"},
|
||||
{file = "ruff-0.11.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:869bcf3f9abf6457fbe39b5a37333aa4eecc52a3b99c98827ccc371a8e5b6f1b"},
|
||||
{file = "ruff-0.11.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2a2b50ca35457ba785cd8c93ebbe529467594087b527a08d487cf0ee7b3087e9"},
|
||||
{file = "ruff-0.11.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:7c69c74bf53ddcfbc22e6eb2f31211df7f65054bfc1f72288fc71e5f82db3eab"},
|
||||
{file = "ruff-0.11.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6e8fb75e14560f7cf53b15bbc55baf5ecbe373dd5f3aab96ff7aa7777edd7630"},
|
||||
{file = "ruff-0.11.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:842a472d7b4d6f5924e9297aa38149e5dcb1e628773b70e6387ae2c97a63c58f"},
|
||||
{file = "ruff-0.11.2-py3-none-win32.whl", hash = "sha256:aca01ccd0eb5eb7156b324cfaa088586f06a86d9e5314b0eb330cb48415097cc"},
|
||||
{file = "ruff-0.11.2-py3-none-win_amd64.whl", hash = "sha256:3170150172a8f994136c0c66f494edf199a0bbea7a409f649e4bc8f4d7084080"},
|
||||
{file = "ruff-0.11.2-py3-none-win_arm64.whl", hash = "sha256:52933095158ff328f4c77af3d74f0379e34fd52f175144cefc1b192e7ccd32b4"},
|
||||
{file = "ruff-0.11.2.tar.gz", hash = "sha256:ec47591497d5a1050175bdf4e1a4e6272cddff7da88a2ad595e1e326041d8d94"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3844,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "715fc8c896dcfa1b15054deeddcdec557ef93af91b26e1c8e4688fe4dbef5296"
|
||||
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
|
||||
|
||||
@@ -103,6 +103,7 @@ uuid.workspace = true
|
||||
x509-cert.workspace = true
|
||||
redis.workspace = true
|
||||
zerocopy.workspace = true
|
||||
flag-bearer = { version = "0.1.0-rc.4" }
|
||||
|
||||
# jwt stuff
|
||||
jose-jwa = "0.1.2"
|
||||
|
||||
@@ -314,9 +314,9 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
None => {
|
||||
bail!("plain auth requires redis_notifications to be set");
|
||||
}
|
||||
Some(url) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
|
||||
),
|
||||
Some(url) => {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
|
||||
}
|
||||
},
|
||||
("irsa", _) => match (&args.redis_host, args.redis_port) {
|
||||
(Some(host), Some(port)) => Some(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Mock console backend which relies on a user-provided postgres instance.
|
||||
|
||||
use std::io;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -22,7 +23,6 @@ use crate::control_plane::errors::{
|
||||
};
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::control_plane::{AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::io_error;
|
||||
use crate::intern::RoleNameInt;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
|
||||
use crate::url::ApiUrl;
|
||||
@@ -36,13 +36,13 @@ enum MockApiError {
|
||||
|
||||
impl From<MockApiError> for ControlPlaneError {
|
||||
fn from(e: MockApiError) -> Self {
|
||||
io_error(e).into()
|
||||
io::Error::other(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio_postgres::Error> for ControlPlaneError {
|
||||
fn from(e: tokio_postgres::Error) -> Self {
|
||||
io_error(e).into()
|
||||
io::Error::other(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use std::io;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason};
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error};
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
@@ -79,13 +81,13 @@ impl CouldRetry for ControlPlaneError {
|
||||
|
||||
impl From<reqwest::Error> for ControlPlaneError {
|
||||
fn from(e: reqwest::Error) -> Self {
|
||||
io_error(e).into()
|
||||
io::Error::other(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest_middleware::Error> for ControlPlaneError {
|
||||
fn from(e: reqwest_middleware::Error) -> Self {
|
||||
io_error(e).into()
|
||||
io::Error::other(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
use std::error::Error as StdError;
|
||||
use std::{fmt, io};
|
||||
use std::fmt;
|
||||
|
||||
use anyhow::Context;
|
||||
use measured::FixedCardinalityLabel;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
/// Upcast (almost) any error into an opaque [`io::Error`].
|
||||
pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
/// Marks errors that may be safely shown to a client.
|
||||
/// This trait can be seen as a specialized version of [`ToString`].
|
||||
///
|
||||
|
||||
@@ -163,8 +163,7 @@ fn process_proxy_payload(
|
||||
// other values are unassigned and must not be emitted by senders. Receivers
|
||||
// must drop connections presenting unexpected values here.
|
||||
#[rustfmt::skip] // https://github.com/rust-lang/rustfmt/issues/6384
|
||||
_ => return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
_ => return Err(io::Error::other(
|
||||
format!(
|
||||
"invalid proxy protocol command 0x{:02X}. expected local (0x20) or proxy (0x21)",
|
||||
header.version_and_command
|
||||
@@ -178,21 +177,20 @@ fn process_proxy_payload(
|
||||
TCP_OVER_IPV4 | UDP_OVER_IPV4 => {
|
||||
let addr = payload
|
||||
.try_get::<ProxyProtocolV2HeaderV4>()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
|
||||
.ok_or_else(|| io::Error::other(size_err))?;
|
||||
|
||||
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
|
||||
}
|
||||
TCP_OVER_IPV6 | UDP_OVER_IPV6 => {
|
||||
let addr = payload
|
||||
.try_get::<ProxyProtocolV2HeaderV6>()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
|
||||
.ok_or_else(|| io::Error::other(size_err))?;
|
||||
|
||||
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
|
||||
}
|
||||
// unspecified or unix stream. ignore the addresses
|
||||
_ => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
return Err(io::Error::other(
|
||||
"invalid proxy protocol address family/transport protocol.",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
//! Algorithms for controlling concurrency limits.
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use flag_bearer::{Semaphore, SemaphoreState, Uncloseable};
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::error::Elapsed;
|
||||
|
||||
@@ -65,23 +63,13 @@ pub struct RateLimiterConfig {
|
||||
pub(crate) initial_limit: usize,
|
||||
}
|
||||
|
||||
impl RateLimiterConfig {
|
||||
pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
|
||||
match self.algorithm {
|
||||
RateLimitAlgorithm::Fixed => Box::new(Fixed),
|
||||
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LimiterInner {
|
||||
alg: Box<dyn LimitAlgorithm>,
|
||||
available: usize,
|
||||
pub(crate) struct LimiterInner<L: ?Sized> {
|
||||
limit: usize,
|
||||
in_flight: usize,
|
||||
alg: L,
|
||||
}
|
||||
|
||||
impl LimiterInner {
|
||||
impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
|
||||
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
|
||||
if let Some(outcome) = outcome {
|
||||
let sample = Sample {
|
||||
@@ -92,21 +80,26 @@ impl LimiterInner {
|
||||
self.limit = self.alg.update(self.limit, sample);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn take(&mut self, ready: &Notify) -> Option<()> {
|
||||
if self.available >= 1 {
|
||||
self.available -= 1;
|
||||
impl<L: ?Sized> SemaphoreState for LimiterInner<L> {
|
||||
type Params = ();
|
||||
type Permit = ();
|
||||
type Closeable = Uncloseable;
|
||||
|
||||
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
|
||||
if self.in_flight < self.limit {
|
||||
self.in_flight += 1;
|
||||
|
||||
// tell the next in the queue that there is a permit ready
|
||||
if self.available >= 1 {
|
||||
ready.notify_one();
|
||||
}
|
||||
Some(())
|
||||
Ok(p)
|
||||
} else {
|
||||
None
|
||||
Err(p)
|
||||
}
|
||||
}
|
||||
|
||||
fn release(&mut self, _p: Self::Permit) {
|
||||
self.in_flight -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the number of concurrent jobs.
|
||||
@@ -116,11 +109,9 @@ impl LimiterInner {
|
||||
///
|
||||
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
|
||||
/// caused by overload (loss).
|
||||
pub(crate) struct DynamicLimiter {
|
||||
config: RateLimiterConfig,
|
||||
inner: Mutex<LimiterInner>,
|
||||
// to notify when a token is available
|
||||
ready: Notify,
|
||||
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
|
||||
disabled: bool,
|
||||
sem: Semaphore<LimiterInner<L>>,
|
||||
}
|
||||
|
||||
/// A concurrency token, required to run a job.
|
||||
@@ -140,22 +131,27 @@ struct LimiterState {
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
impl<L> DynamicLimiter<L> {
|
||||
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
disabled: initial_limit == 0,
|
||||
sem: Semaphore::new_fifo(LimiterInner {
|
||||
limit: initial_limit,
|
||||
in_flight: 0,
|
||||
alg,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl DynamicLimiter {
|
||||
/// Create a limiter with a given limit control algorithm.
|
||||
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
|
||||
let ready = Notify::new();
|
||||
ready.notify_one();
|
||||
|
||||
Arc::new(Self {
|
||||
inner: Mutex::new(LimiterInner {
|
||||
alg: config.create_rate_limit_algorithm(),
|
||||
available: config.initial_limit,
|
||||
limit: config.initial_limit,
|
||||
in_flight: 0,
|
||||
}),
|
||||
ready,
|
||||
config,
|
||||
})
|
||||
let initial_limit = config.initial_limit;
|
||||
match config.algorithm {
|
||||
RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed),
|
||||
RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
|
||||
@@ -163,27 +159,21 @@ impl DynamicLimiter {
|
||||
self: &Arc<Self>,
|
||||
duration: Duration,
|
||||
) -> Result<Token, Elapsed> {
|
||||
tokio::time::timeout(duration, self.acquire()).await?
|
||||
tokio::time::timeout(duration, self.acquire()).await
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token].
|
||||
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
|
||||
if self.config.initial_limit == 0 {
|
||||
async fn acquire(self: &Arc<Self>) -> Token {
|
||||
if self.disabled {
|
||||
// If the rate limiter is disabled, we can always acquire a token.
|
||||
Ok(Token::disabled())
|
||||
Token::disabled()
|
||||
} else {
|
||||
let mut notified = pin!(self.ready.notified());
|
||||
let mut ready = notified.as_mut().enable();
|
||||
loop {
|
||||
if ready {
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.take(&self.ready).is_some() {
|
||||
break Ok(Token::new(self.clone()));
|
||||
}
|
||||
notified.set(self.ready.notified());
|
||||
match self.sem.acquire(()).await {
|
||||
Err(close) => close.never(),
|
||||
Ok(permit) => {
|
||||
permit.take();
|
||||
Token::new(self.clone())
|
||||
}
|
||||
notified.as_mut().await;
|
||||
ready = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,27 +190,20 @@ impl DynamicLimiter {
|
||||
} else {
|
||||
tracing::debug!("outcome is {:?}", outcome);
|
||||
}
|
||||
if self.config.initial_limit == 0 {
|
||||
if self.disabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
inner.update_limit(start.elapsed(), outcome);
|
||||
|
||||
inner.in_flight -= 1;
|
||||
if inner.in_flight < inner.limit {
|
||||
inner.available = inner.limit - inner.in_flight;
|
||||
// At least 1 permit is now available
|
||||
self.ready.notify_one();
|
||||
}
|
||||
self.sem.with_state(|s| {
|
||||
s.update_limit(start.elapsed(), outcome);
|
||||
s.release(());
|
||||
});
|
||||
}
|
||||
|
||||
/// The current state of the limiter.
|
||||
#[cfg(test)]
|
||||
fn state(&self) -> LimiterState {
|
||||
let inner = self.inner.lock();
|
||||
LimiterState { limit: inner.limit }
|
||||
self.sem.with_state(|s| LimiterState { limit: s.limit })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,6 +143,8 @@ impl ConnectionWithCredentialsProvider {
|
||||
db: 0,
|
||||
username: Some(username),
|
||||
password: Some(password.clone()),
|
||||
// TODO: switch to RESP3 after testing new client version.
|
||||
protocol: redis::ProtocolVersion::RESP2,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
Value::String(s) => Some(s.clone()),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
|
||||
@@ -866,7 +866,7 @@ impl QueryData {
|
||||
let (inner, mut discard) = client.inner();
|
||||
let cancel_token = inner.cancel_token();
|
||||
|
||||
let res = match select(
|
||||
match select(
|
||||
pin!(query_to_json(
|
||||
config,
|
||||
&mut *inner,
|
||||
@@ -889,7 +889,7 @@ impl QueryData {
|
||||
// The query failed with an error
|
||||
Either::Left((Err(e), __not_yet_cancelled)) => {
|
||||
discard.discard();
|
||||
return Err(e);
|
||||
Err(e)
|
||||
}
|
||||
// The query was cancelled.
|
||||
Either::Right((_cancelled, query)) => {
|
||||
@@ -930,8 +930,7 @@ impl QueryData {
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use tracing::warn;
|
||||
use crate::cancellation::CancellationHandler;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, io_error};
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::{ClientMode, ErrorSource, handle_client};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
@@ -50,23 +50,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
|
||||
let this = self.project();
|
||||
let mut stream = this.stream;
|
||||
|
||||
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
|
||||
ready!(stream.as_mut().poll_ready(cx).map_err(io::Error::other))?;
|
||||
|
||||
this.send.put(buf);
|
||||
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
|
||||
Ok(()) => Poll::Ready(Ok(buf.len())),
|
||||
Err(e) => Poll::Ready(Err(io_error(e))),
|
||||
Err(e) => Poll::Ready(Err(io::Error::other(e))),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let stream = self.project().stream;
|
||||
stream.poll_flush(cx).map_err(io_error)
|
||||
stream.poll_flush(cx).map_err(io::Error::other)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let stream = self.project().stream;
|
||||
stream.poll_close(cx).map_err(io_error)
|
||||
stream.poll_close(cx).map_err(io::Error::other)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
}
|
||||
|
||||
let res = ready!(this.stream.as_mut().poll_next(cx));
|
||||
match res.transpose().map_err(io_error)? {
|
||||
match res.transpose().map_err(io::Error::other)? {
|
||||
Some(message) => match message.opcode {
|
||||
OpCode::Ping => {}
|
||||
OpCode::Pong => {}
|
||||
@@ -105,7 +105,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
// We expect to see only binary messages.
|
||||
let error = "unexpected text message in the websocket";
|
||||
warn!(length = message.payload.len(), error);
|
||||
return Poll::Ready(Err(io_error(error)));
|
||||
return Poll::Ready(Err(io::Error::other(error)));
|
||||
}
|
||||
OpCode::Binary | OpCode::Continuation => {
|
||||
debug_assert!(this.recv.is_empty());
|
||||
|
||||
@@ -173,7 +173,7 @@ impl CertResolver {
|
||||
}
|
||||
|
||||
pub fn get_common_names(&self) -> HashSet<String> {
|
||||
self.certs.keys().map(|s| s.to_string()).collect()
|
||||
self.certs.keys().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ jsonnet = "^0.21.0-rc2"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = "==1.13.0"
|
||||
ruff = "^0.7.0"
|
||||
ruff = "^0.11.2"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
@@ -109,4 +109,5 @@ select = [
|
||||
"W", # pycodestyle
|
||||
"B", # bugbear
|
||||
"UP", # pyupgrade
|
||||
"TC", # flake8-type-checking
|
||||
]
|
||||
|
||||
@@ -8,9 +8,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
|
||||
@@ -7,13 +7,13 @@ import logging
|
||||
import signal
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from collections.abc import Awaitable
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import aiohttp
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Awaitable
|
||||
from typing import Any
|
||||
|
||||
|
||||
|
||||
@@ -24,9 +24,9 @@ use pageserver_api::controller_api::{
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
|
||||
TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
|
||||
TimelineCreateRequest,
|
||||
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
|
||||
@@ -582,6 +582,32 @@ async fn handle_tenant_timeline_download_heatmap_layers(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_lsn_lease(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
|
||||
|
||||
service
|
||||
.tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
|
||||
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
|
||||
// compare to, so we can just filter out our well known ID format with regexes.
|
||||
@@ -2192,6 +2218,17 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
// LSN lease passthrough to all shards
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_lsn_lease,
|
||||
RequestName("v1_tenant_timeline_lsn_lease"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// Tenant detail GET passthrough to shard zero:
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use pageserver_api::models::detach_ancestor::AncestorDetached;
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, LocationConfig, LocationConfigListResponse, PageserverUtilization,
|
||||
DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization,
|
||||
SecondaryProgress, TenantScanRemoteStorageResponse, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse, TenantWaitLsnRequest, TimelineArchivalConfigRequest,
|
||||
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
|
||||
@@ -10,6 +10,7 @@ use pageserver_client::BlockUnblock;
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
@@ -195,6 +196,22 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_lease_lsn(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<LsnLease> {
|
||||
measured_request!(
|
||||
"timeline_lease_lsn",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_init_lsn_lease(tenant_shard_id, timeline_id, lsn)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -1369,6 +1369,65 @@ impl Persistence {
|
||||
Ok(timeline_from_db)
|
||||
}
|
||||
|
||||
/// Set `delete_at` for the given timeline
|
||||
pub(crate) async fn timeline_set_deleted_at(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines;
|
||||
|
||||
let deletion_time = chrono::Local::now().to_utc();
|
||||
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(timelines::table)
|
||||
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
|
||||
.set(timelines::deleted_at.eq(Some(deletion_time)))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match updated {
|
||||
0 => Ok(()),
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
updated
|
||||
))),
|
||||
}
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Load timeline from db. Returns `None` if not present.
|
||||
///
|
||||
/// Only works if `deleted_at` is set, so you should call [`Self::timeline_set_deleted_at`] before.
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
diesel::delete(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::deleted_at.is_not_null())
|
||||
.execute(conn)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads a list of all timelines from database.
|
||||
pub(crate) async fn list_timelines_for_tenant(
|
||||
&self,
|
||||
@@ -1491,6 +1550,34 @@ impl Persistence {
|
||||
|
||||
Ok(timeline_from_db)
|
||||
}
|
||||
/// List pending operations for a given timeline (including tenant-global ones)
|
||||
pub(crate) async fn list_pending_ops_for_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
let timelines_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(
|
||||
dsl::timeline_id
|
||||
.eq(timeline_id.to_string())
|
||||
.or(dsl::timeline_id.eq("")),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(timelines_from_db)
|
||||
}
|
||||
|
||||
/// Delete all pending ops for the given timeline.
|
||||
///
|
||||
@@ -1974,7 +2061,7 @@ impl ToSql<crate::schema::sql_types::PgLsn, Pg> for LsnWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
|
||||
#[derive(Insertable, AsChangeset, Clone)]
|
||||
#[diesel(table_name = crate::schema::timelines)]
|
||||
pub(crate) struct TimelinePersistence {
|
||||
pub(crate) tenant_id: String,
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use anyhow::Context;
|
||||
use context_iterator::TenantShardContextIterator;
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::controller_api::{
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode,
|
||||
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
|
||||
PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig,
|
||||
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
|
||||
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
|
||||
@@ -60,6 +60,7 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
|
||||
use utils::completion::Barrier;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::{failpoint_support, pausable_failpoint};
|
||||
|
||||
@@ -152,6 +153,7 @@ enum TenantOperations {
|
||||
TimelineGcBlockUnblock,
|
||||
DropDetached,
|
||||
DownloadHeatmapLayers,
|
||||
TimelineLsnLease,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -3987,6 +3989,75 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_lsn_lease(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<LsnLease, ApiError> {
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineLsnLease,
|
||||
)
|
||||
.await;
|
||||
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// If the request got an unsharded tenant id, then apply
|
||||
// the operation to all shards. Otherwise, apply it to a specific shard.
|
||||
let shards_range = TenantShardId::tenant_range(tenant_id);
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
}
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
let res = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
|tenant_shard_id, client| async move {
|
||||
client
|
||||
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
|
||||
.await
|
||||
},
|
||||
1,
|
||||
1,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut valid_until = None;
|
||||
for r in res {
|
||||
match r {
|
||||
Ok(lease) => {
|
||||
if let Some(ref mut valid_until) = valid_until {
|
||||
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
|
||||
} else {
|
||||
valid_until = Some(lease.valid_until);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(LsnLease {
|
||||
valid_until: valid_until.unwrap_or_else(SystemTime::now),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use rand::{Rng, thread_rng};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
use utils::shard::TenantShardId;
|
||||
@@ -64,17 +64,22 @@ impl ChaosInjector {
|
||||
let mut interval = tokio::time::interval(self.interval);
|
||||
#[derive(Debug)]
|
||||
enum ChaosEvent {
|
||||
ShuffleTenant,
|
||||
ForceKill,
|
||||
MigrationsToSecondary,
|
||||
ForceKillController,
|
||||
GracefulMigrationsAnywhere,
|
||||
}
|
||||
loop {
|
||||
let cron_interval = self.get_cron_interval_sleep_future();
|
||||
let chaos_type = tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
ChaosEvent::ShuffleTenant
|
||||
if thread_rng().gen_bool(0.5) {
|
||||
ChaosEvent::MigrationsToSecondary
|
||||
} else {
|
||||
ChaosEvent::GracefulMigrationsAnywhere
|
||||
}
|
||||
}
|
||||
Some(_) = maybe_sleep(cron_interval) => {
|
||||
ChaosEvent::ForceKill
|
||||
ChaosEvent::ForceKillController
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutting down");
|
||||
@@ -83,16 +88,29 @@ impl ChaosInjector {
|
||||
};
|
||||
tracing::info!("Chaos iteration: {chaos_type:?}...");
|
||||
match chaos_type {
|
||||
ChaosEvent::ShuffleTenant => {
|
||||
self.inject_chaos().await;
|
||||
ChaosEvent::MigrationsToSecondary => {
|
||||
self.inject_migrations_to_secondary();
|
||||
}
|
||||
ChaosEvent::ForceKill => {
|
||||
ChaosEvent::GracefulMigrationsAnywhere => {
|
||||
self.inject_graceful_migrations_anywhere();
|
||||
}
|
||||
ChaosEvent::ForceKillController => {
|
||||
self.force_kill().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_shard_eligible_for_chaos(&self, shard: &TenantShard) -> bool {
|
||||
// - Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
// - Skip shards doing a graceful migration already, so that we allow these to run to
|
||||
// completion rather than only exercising the first part and then cancelling with
|
||||
// some other chaos.
|
||||
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
|
||||
&& shard.get_preferred_node().is_none()
|
||||
}
|
||||
|
||||
/// If a shard has a secondary and attached location, then re-assign the secondary to be
|
||||
/// attached and the attached to be secondary.
|
||||
///
|
||||
@@ -108,13 +126,7 @@ impl ChaosInjector {
|
||||
.get_mut(&tenant_shard_id)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
|
||||
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
tracing::info!(
|
||||
"Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
|
||||
shard.get_scheduling_policy()
|
||||
);
|
||||
if !self.is_shard_eligible_for_chaos(shard) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -152,7 +164,77 @@ impl ChaosInjector {
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
// Unlike [`Self::inject_migrations_to_secondary`], this function will not only cut over to secondary, it
|
||||
// will migrate a tenant to a random node in its home AZ using a graceful migration of the same type
|
||||
// that my be initiated by an API caller using prewarm=true.
|
||||
//
|
||||
// This is a much more expensive operation in terms of I/O and time, as we will fully warm up
|
||||
// some new location in order to migrate the tenant there. For that reason we do far fewer of these.
|
||||
fn inject_graceful_migrations_anywhere(&mut self) {
|
||||
let batch_size = 1;
|
||||
let mut inner = self.service.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = inner.parts_mut();
|
||||
|
||||
let mut candidates = tenants
|
||||
.values_mut()
|
||||
.filter(|shard| self.is_shard_eligible_for_chaos(shard))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
"Injecting chaos: found {} candidates for graceful migrations anywhere",
|
||||
candidates.len()
|
||||
);
|
||||
|
||||
let mut victims: Vec<&mut TenantShard> = Vec::new();
|
||||
|
||||
// Pick our victims: use a hand-rolled loop rather than choose_multiple() because we want
|
||||
// to take the mutable refs from our candidates rather than ref'ing them.
|
||||
while !candidates.is_empty() && victims.len() < batch_size {
|
||||
let i = thread_rng().gen_range(0..candidates.len());
|
||||
victims.push(candidates.swap_remove(i));
|
||||
}
|
||||
|
||||
for victim in victims.into_iter() {
|
||||
// Find a node in the same AZ as the shard, or if the shard has no AZ preference, which
|
||||
// is not where they are currently attached.
|
||||
let candidate_nodes = nodes
|
||||
.values()
|
||||
.filter(|node| {
|
||||
if let Some(preferred_az) = victim.preferred_az() {
|
||||
node.get_availability_zone_id() == preferred_az
|
||||
} else if let Some(attached) = *victim.intent.get_attached() {
|
||||
node.get_id() != attached
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let Some(victim_node) = candidate_nodes.choose(&mut thread_rng()) else {
|
||||
// This can happen if e.g. we are in a small region with only one pageserver per AZ.
|
||||
tracing::info!(
|
||||
"no candidate nodes found for migrating shard {tenant_shard_id} within its home AZ",
|
||||
tenant_shard_id = victim.tenant_shard_id
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
// This doesn't change intent immediately: next iteration of Service::optimize_all should do that. We avoid
|
||||
// doing it here because applying optimizations requires dropping lock to do some async work to check the optimisation
|
||||
// is valid given remote state, and it would be a shame to duplicate that dance here.
|
||||
tracing::info!(
|
||||
"Injecting chaos: migrate {} to {}",
|
||||
victim.tenant_shard_id,
|
||||
victim_node
|
||||
);
|
||||
victim.set_preferred_node(Some(victim_node.get_id()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Migrations of attached locations to their secondary location. This exercises reconciliation in general,
|
||||
/// live migration in particular, and the pageserver code for cleanly shutting down and starting up tenants
|
||||
/// during such migrations.
|
||||
fn inject_migrations_to_secondary(&mut self) {
|
||||
// Pick some shards to interfere with
|
||||
let batch_size = 128;
|
||||
let mut inner = self.service.inner.write().unwrap();
|
||||
|
||||
@@ -160,9 +160,8 @@ pub(crate) struct ScheduleRequest {
|
||||
}
|
||||
|
||||
struct ReconcilerHandle {
|
||||
tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), Arc<CancellationToken>>>,
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -172,13 +171,13 @@ impl ReconcilerHandle {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) -> Arc<CancellationToken> {
|
||||
) -> CancellationToken {
|
||||
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
|
||||
if let Entry::Occupied(entry) = &entry {
|
||||
let cancel: &CancellationToken = entry.get();
|
||||
cancel.cancel();
|
||||
}
|
||||
entry.insert(Arc::new(self.cancel.child_token())).clone()
|
||||
entry.insert(self.cancel.child_token()).clone()
|
||||
}
|
||||
/// Cancel an ongoing reconciliation
|
||||
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
|
||||
@@ -197,7 +196,7 @@ impl ReconcilerHandle {
|
||||
|
||||
pub(crate) struct SafekeeperReconciler {
|
||||
service: Arc<Service>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -243,7 +242,7 @@ impl SafekeeperReconciler {
|
||||
.await;
|
||||
}
|
||||
}
|
||||
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
|
||||
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
|
||||
let req_host = req.safekeeper.skp.host.clone();
|
||||
match req.kind {
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
@@ -300,36 +299,96 @@ impl SafekeeperReconciler {
|
||||
SafekeeperTimelineOpKind::Delete => {
|
||||
let tenant_id = req.tenant_id;
|
||||
if let Some(timeline_id) = req.timeline_id {
|
||||
self.reconcile_inner(
|
||||
let deleted = self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if deleted {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
} else {
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_tenant(tenant_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted tenant from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
let deleted = self
|
||||
.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_tenant(tenant_id).await,
|
||||
|_resp| {
|
||||
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if deleted {
|
||||
self.delete_tenant_timelines_from_db(tenant_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
|
||||
match self
|
||||
.service
|
||||
.persistence
|
||||
.list_pending_ops_for_timeline(tenant_id, timeline_id)
|
||||
.await
|
||||
{
|
||||
Ok(list) => {
|
||||
if !list.is_empty() {
|
||||
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
|
||||
// In theory we could crash right after deleting the op from the db and right before reaching this,
|
||||
// but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
|
||||
if let Err(err) = self
|
||||
.service
|
||||
.persistence
|
||||
.delete_timeline(tenant_id, timeline_id)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
|
||||
}
|
||||
}
|
||||
async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
|
||||
let timeline_list = match self
|
||||
.service
|
||||
.persistence
|
||||
.list_timelines_for_tenant(tenant_id)
|
||||
.await
|
||||
{
|
||||
Ok(timeline_list) => timeline_list,
|
||||
Err(e) => {
|
||||
tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
for timeline in timeline_list {
|
||||
let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
|
||||
tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
|
||||
continue;
|
||||
};
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
}
|
||||
/// Returns whether the reconciliation happened successfully
|
||||
async fn reconcile_inner<T, F, U>(
|
||||
&self,
|
||||
req: ScheduleRequest,
|
||||
closure: impl Fn(SafekeeperClient) -> F,
|
||||
log_success: impl FnOnce(T) -> U,
|
||||
req_cancel: Arc<CancellationToken>,
|
||||
) where
|
||||
req_cancel: CancellationToken,
|
||||
) -> bool
|
||||
where
|
||||
F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
|
||||
{
|
||||
let jwt = self
|
||||
@@ -373,11 +432,11 @@ impl SafekeeperReconciler {
|
||||
req.safekeeper.skp.host
|
||||
);
|
||||
}
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
// On cancellation, the code that issued it will take care of removing db entries (if needed)
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -313,25 +313,32 @@ impl Service {
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
self.persistence
|
||||
.timeline_set_deleted_at(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let all_sks = tl
|
||||
.new_sk_set
|
||||
.iter()
|
||||
.flat_map(|sks| {
|
||||
sks.iter()
|
||||
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
|
||||
})
|
||||
.chain(
|
||||
tl.sk_set
|
||||
.iter()
|
||||
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
|
||||
)
|
||||
.collect::<HashMap<_, _>>();
|
||||
.flatten()
|
||||
.chain(tl.sk_set.iter())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// Schedule reconciliations
|
||||
for &sk_id in all_sks.iter() {
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: tl.generation,
|
||||
op_kind: SafekeeperTimelineOpKind::Delete,
|
||||
sk_id: *sk_id,
|
||||
};
|
||||
tracing::info!("writing pending op for sk id {sk_id}");
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
}
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for (sk_id, kind) in all_sks {
|
||||
let sk_id = NodeId(sk_id as u64);
|
||||
for sk_id in all_sks {
|
||||
let sk_id = NodeId(*sk_id as u64);
|
||||
let Some(sk) = locked.safekeepers.get(&sk_id) else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Couldn't find safekeeper with id {sk_id}"
|
||||
@@ -345,7 +352,7 @@ impl Service {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: tl.generation as u32,
|
||||
kind,
|
||||
kind: SafekeeperTimelineOpKind::Delete,
|
||||
};
|
||||
locked.safekeeper_reconcilers.schedule_request(self, req);
|
||||
}
|
||||
@@ -379,32 +386,50 @@ impl Service {
|
||||
})
|
||||
.collect::<Result<Vec<_>, ApiError>>()?;
|
||||
|
||||
// Remove pending ops from db.
|
||||
// Remove pending ops from db, and set `deleted_at`.
|
||||
// We cancel them in a later iteration once we hold the state lock.
|
||||
for (timeline_id, _timeline) in timeline_list.iter() {
|
||||
self.persistence
|
||||
.remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
|
||||
.await?;
|
||||
self.persistence
|
||||
.timeline_set_deleted_at(tenant_id, *timeline_id)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
// The list of safekeepers that have any of the timelines
|
||||
let mut sk_list = HashSet::new();
|
||||
|
||||
// List all pending ops for all timelines, cancel them
|
||||
for (timeline_id, timeline) in timeline_list.iter() {
|
||||
for (_timeline_id, timeline) in timeline_list.iter() {
|
||||
let sk_iter = timeline
|
||||
.sk_set
|
||||
.iter()
|
||||
.chain(timeline.new_sk_set.iter().flatten())
|
||||
.map(|id| NodeId(*id as u64));
|
||||
for sk_id in sk_iter.clone() {
|
||||
sk_list.extend(sk_iter);
|
||||
}
|
||||
|
||||
for &sk_id in sk_list.iter() {
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: String::new(),
|
||||
generation: i32::MAX,
|
||||
op_kind: SafekeeperTimelineOpKind::Delete,
|
||||
sk_id: sk_id.0 as i64,
|
||||
};
|
||||
tracing::info!("writing pending op for sk id {sk_id}");
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
}
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
for (timeline_id, _timeline) in timeline_list.iter() {
|
||||
for sk_id in sk_list.iter() {
|
||||
locked
|
||||
.safekeeper_reconcilers
|
||||
.cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id));
|
||||
.cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
|
||||
}
|
||||
sk_list.extend(sk_iter);
|
||||
}
|
||||
|
||||
// unwrap is safe: we return above for an empty timeline list
|
||||
|
||||
@@ -4,11 +4,15 @@ Run the regression tests on the cloud instance of Neon
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
|
||||
@@ -2,11 +2,12 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import jwt
|
||||
|
||||
from fixtures.common_types import TenantId
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.common_types import TenantId
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -15,18 +15,20 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from _pytest.terminal import TerminalReporter
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable, Iterator, Mapping
|
||||
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from _pytest.terminal import TerminalReporter
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
|
||||
|
||||
"""
|
||||
This file contains fixtures for micro-benchmarks.
|
||||
|
||||
@@ -11,7 +11,6 @@ from pathlib import Path
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
@@ -24,11 +23,14 @@ from fixtures.neon_fixtures import (
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
|
||||
class PgCompare(ABC):
|
||||
"""Common interface of all postgres implementations, useful for benchmarks.
|
||||
|
||||
@@ -4,8 +4,6 @@ import concurrent.futures
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.common_types import TenantId
|
||||
@@ -15,6 +13,9 @@ if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
|
||||
|
||||
class ComputeReconfigure:
|
||||
def __init__(self, server: HTTPServer):
|
||||
|
||||
@@ -5,6 +5,8 @@ import urllib.parse
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
class EndpointHttpClient(requests.Session):
|
||||
def __init__(
|
||||
@@ -51,6 +53,7 @@ class EndpointHttpClient(requests.Session):
|
||||
def metrics(self) -> str:
|
||||
res = self.get(f"http://localhost:{self.external_port}/metrics")
|
||||
res.raise_for_status()
|
||||
log.debug("raw compute metrics: %s", res.text)
|
||||
return res.text
|
||||
|
||||
# Current compute status.
|
||||
|
||||
@@ -147,7 +147,7 @@ def fast_import(
|
||||
pg_distrib_dir,
|
||||
pg_version,
|
||||
workdir,
|
||||
cleanup=not cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
cleanup=not cast("bool", pytestconfig.getoption("--preserve-database-files")),
|
||||
) as fi:
|
||||
yield fi
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ import asyncio
|
||||
import collections
|
||||
import io
|
||||
import json
|
||||
from collections.abc import AsyncIterable
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
import pytest_asyncio
|
||||
@@ -31,6 +30,7 @@ from h2.settings import SettingCodes
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import AsyncIterable
|
||||
from typing import Any
|
||||
|
||||
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
class Metrics:
|
||||
metrics: dict[str, list[Sample]]
|
||||
@@ -168,7 +171,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
"pageserver_aux_file_estimated_size",
|
||||
"pageserver_valid_lsn_lease_count",
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
|
||||
@@ -7,7 +7,6 @@ import subprocess
|
||||
import tempfile
|
||||
import textwrap
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import toml
|
||||
@@ -15,14 +14,15 @@ import toml
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.common_types import IndexPartDump
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
cast,
|
||||
)
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
# Used to be an ABC. abc.ABC removed due to linter without name change.
|
||||
class AbstractNeonCli:
|
||||
@@ -36,7 +36,7 @@ class AbstractNeonCli:
|
||||
self.extra_env = extra_env
|
||||
self.binpath = binpath
|
||||
|
||||
COMMAND: str = cast(str, None) # To be overwritten by the derived class.
|
||||
COMMAND: str = cast("str", None) # To be overwritten by the derived class.
|
||||
|
||||
def raw_cli(
|
||||
self,
|
||||
|
||||
@@ -14,14 +14,12 @@ import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from collections.abc import Iterable, Iterator
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
@@ -34,19 +32,12 @@ import psycopg2.sql
|
||||
import pytest
|
||||
import requests
|
||||
import toml
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from jwcrypto import jwk
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from pytest_httpserver import HTTPServer
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.auth_tokens import AuthKeys, TokenScope
|
||||
@@ -60,7 +51,6 @@ from fixtures.common_types import (
|
||||
)
|
||||
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.h2server import H2Server
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.neon_cli import NeonLocalCli, Pagectl
|
||||
@@ -78,7 +68,6 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
)
|
||||
from fixtures.paths import get_test_repo_dir, shared_snapshot_dir
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
@@ -108,10 +97,21 @@ from fixtures.utils import (
|
||||
from .neon_api import NeonAPI, NeonApiEndpoint
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Callable, Iterable, Iterator
|
||||
from types import TracebackType
|
||||
from typing import Any, Self, TypeVar
|
||||
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from pytest_httpserver import HTTPServer
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.h2server import H2Server
|
||||
from fixtures.paths import SnapshotDirLocked
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@@ -497,9 +497,9 @@ class NeonEnvBuilder:
|
||||
else:
|
||||
self.pageserver_wal_receiver_protocol = PageserverWalReceiverProtocol.INTERPRETED
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
assert test_name.startswith("test_"), (
|
||||
"Unexpectedly instantiated from outside a test function"
|
||||
)
|
||||
self.test_name = test_name
|
||||
self.compatibility_neon_binpath = compatibility_neon_binpath
|
||||
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
|
||||
@@ -508,12 +508,12 @@ class NeonEnvBuilder:
|
||||
self.mixdir = self.test_output_dir / "mixdir_neon"
|
||||
|
||||
if self.version_combination is not None:
|
||||
assert (
|
||||
self.compatibility_neon_binpath is not None
|
||||
), "the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
|
||||
assert (
|
||||
self.compatibility_pg_distrib_dir is not None
|
||||
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
|
||||
assert self.compatibility_neon_binpath is not None, (
|
||||
"the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
|
||||
)
|
||||
assert self.compatibility_pg_distrib_dir is not None, (
|
||||
"the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
|
||||
)
|
||||
self.mixdir.mkdir(mode=0o755, exist_ok=True)
|
||||
self._mix_versions()
|
||||
self.test_may_use_compatibility_snapshot_binaries = True
|
||||
@@ -795,9 +795,9 @@ class NeonEnvBuilder:
|
||||
work = ident_state_dir / "work"
|
||||
assert upper.is_dir()
|
||||
assert work.is_dir()
|
||||
assert (
|
||||
self.test_overlay_dir not in dst.parents
|
||||
), "otherwise workdir cleanup below wouldn't work"
|
||||
assert self.test_overlay_dir not in dst.parents, (
|
||||
"otherwise workdir cleanup below wouldn't work"
|
||||
)
|
||||
# find index, still not mutating state
|
||||
idxmap = {
|
||||
existing_ident: idx
|
||||
@@ -863,9 +863,9 @@ class NeonEnvBuilder:
|
||||
self.pageserver_remote_storage = ret
|
||||
|
||||
def enable_safekeeper_remote_storage(self, kind: RemoteStorageKind):
|
||||
assert (
|
||||
self.safekeepers_remote_storage is None
|
||||
), "safekeepers_remote_storage already configured"
|
||||
assert self.safekeepers_remote_storage is None, (
|
||||
"safekeepers_remote_storage already configured"
|
||||
)
|
||||
|
||||
self.safekeepers_remote_storage = self._configure_and_create_remote_storage(
|
||||
kind, RemoteStorageUser.SAFEKEEPER
|
||||
@@ -1421,9 +1421,9 @@ class NeonEnv:
|
||||
assert that there is only one. Tests with multiple pageservers should always use
|
||||
get_pageserver with an explicit ID.
|
||||
"""
|
||||
assert (
|
||||
len(self.pageservers) == 1
|
||||
), "env.pageserver must only be used with single pageserver NeonEnv"
|
||||
assert len(self.pageservers) == 1, (
|
||||
"env.pageserver must only be used with single pageserver NeonEnv"
|
||||
)
|
||||
return self.pageservers[0]
|
||||
|
||||
def get_pageserver(self, id: int | None) -> NeonPageserver:
|
||||
@@ -1614,7 +1614,7 @@ def neon_simple_env(
|
||||
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
preserve_database_files=cast("bool", pytestconfig.getoption("--preserve-database-files")),
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
@@ -1683,7 +1683,7 @@ def neon_env_builder(
|
||||
combination=combination,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
preserve_database_files=cast("bool", pytestconfig.getoption("--preserve-database-files")),
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
@@ -3577,9 +3577,9 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
assert (
|
||||
self._popen and self._popen.poll() is None
|
||||
), "Proxy exited unexpectedly. Check test log."
|
||||
assert self._popen and self._popen.poll() is None, (
|
||||
"Proxy exited unexpectedly. Check test log."
|
||||
)
|
||||
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
|
||||
|
||||
def http_query(self, query, args, **kwargs):
|
||||
@@ -3787,9 +3787,9 @@ class NeonAuthBroker:
|
||||
|
||||
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
assert (
|
||||
self._popen and self._popen.poll() is None
|
||||
), "Proxy exited unexpectedly. Check test log."
|
||||
assert self._popen and self._popen.poll() is None, (
|
||||
"Proxy exited unexpectedly. Check test log."
|
||||
)
|
||||
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
|
||||
|
||||
async def query(self, query, args, **kwargs):
|
||||
@@ -4069,9 +4069,9 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
m = re.search(r"=\s*(\S+)", line)
|
||||
assert m is not None, f"malformed config line {line}"
|
||||
size = m.group(1)
|
||||
assert size_to_bytes(size) >= size_to_bytes(
|
||||
"1MB"
|
||||
), "LFC size cannot be set less than 1MB"
|
||||
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
|
||||
"LFC size cannot be set less than 1MB"
|
||||
)
|
||||
lfc_path_escaped = str(lfc_path).replace("'", "''")
|
||||
config_lines = [
|
||||
f"neon.file_cache_path = '{lfc_path_escaped}'",
|
||||
@@ -4082,12 +4082,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
] + config_lines
|
||||
else:
|
||||
for line in config_lines:
|
||||
assert (
|
||||
line.find("neon.max_file_cache_size") == -1
|
||||
), "Setting LFC parameters is not allowed when LFC is disabled"
|
||||
assert (
|
||||
line.find("neon.file_cache_size_limit") == -1
|
||||
), "Setting LFC parameters is not allowed when LFC is disabled"
|
||||
assert line.find("neon.max_file_cache_size") == -1, (
|
||||
"Setting LFC parameters is not allowed when LFC is disabled"
|
||||
)
|
||||
assert line.find("neon.file_cache_size_limit") == -1, (
|
||||
"Setting LFC parameters is not allowed when LFC is disabled"
|
||||
)
|
||||
|
||||
self.config(config_lines)
|
||||
|
||||
@@ -4209,7 +4209,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
# Write it back updated
|
||||
with open(config_path, "w") as file:
|
||||
log.info(json.dumps(dict(data_dict, **kwargs)))
|
||||
log.debug(json.dumps(dict(data_dict, **kwargs)))
|
||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||
|
||||
def respec_deep(self, **kwargs: Any) -> None:
|
||||
@@ -4226,7 +4226,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
with open(config_path) as f:
|
||||
data_dict: dict[str, Any] = json.load(f)
|
||||
|
||||
log.info("Current compute spec: %s", json.dumps(data_dict, indent=4))
|
||||
log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4))
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if isinstance(value, dict):
|
||||
@@ -4238,7 +4238,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
data_dict[key] = value
|
||||
|
||||
with open(config_path, "w") as file:
|
||||
log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
|
||||
log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
|
||||
json.dump(data_dict, file, indent=4)
|
||||
|
||||
def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
|
||||
@@ -4925,9 +4925,9 @@ class StorageScrubber:
|
||||
healthy = False
|
||||
else:
|
||||
for _, warnings in with_warnings.items():
|
||||
assert (
|
||||
len(warnings) > 0
|
||||
), "with_warnings value should not be empty, running without verbose mode?"
|
||||
assert len(warnings) > 0, (
|
||||
"with_warnings value should not be empty, running without verbose mode?"
|
||||
)
|
||||
if not self._check_line_list_allowed(warnings):
|
||||
healthy = False
|
||||
break
|
||||
@@ -4941,9 +4941,9 @@ class StorageScrubber:
|
||||
healthy = False
|
||||
else:
|
||||
for _, errors in with_errors.items():
|
||||
assert (
|
||||
len(errors) > 0
|
||||
), "with_errors value should not be empty, running without verbose mode?"
|
||||
assert len(errors) > 0, (
|
||||
"with_errors value should not be empty, running without verbose mode?"
|
||||
)
|
||||
if not self._check_line_list_allowed(errors):
|
||||
healthy = False
|
||||
break
|
||||
|
||||
@@ -5,7 +5,10 @@ from __future__ import annotations
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
|
||||
def scan_pageserver_log_for_errors(
|
||||
|
||||
@@ -7,8 +7,7 @@ import string
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
@@ -26,6 +25,9 @@ from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import EnhancedJSONEncoder, Fn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class PageserverApiException(Exception):
|
||||
def __init__(self, message, status_code: int):
|
||||
|
||||
@@ -4,18 +4,19 @@ import concurrent.futures
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import fixtures.pageserver.remote_storage
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
|
||||
|
||||
def single_timeline(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
@@ -5,11 +5,9 @@ import os
|
||||
import queue
|
||||
import shutil
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.pageserver.common_types import (
|
||||
InvalidFileName,
|
||||
parse_layer_file_name,
|
||||
@@ -17,8 +15,11 @@ from fixtures.pageserver.common_types import (
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
|
||||
@@ -3,13 +3,6 @@ from __future__ import annotations
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
DeleteObjectOutputTypeDef,
|
||||
EmptyResponseMetadataTypeDef,
|
||||
ListObjectsV2OutputTypeDef,
|
||||
ObjectTypeDef,
|
||||
)
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
@@ -19,6 +12,13 @@ from fixtures.utils import wait_until
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
DeleteObjectOutputTypeDef,
|
||||
EmptyResponseMetadataTypeDef,
|
||||
ListObjectsV2OutputTypeDef,
|
||||
ObjectTypeDef,
|
||||
)
|
||||
|
||||
|
||||
def assert_tenant_state(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
@@ -241,9 +241,9 @@ def wait_for_upload_queue_empty(
|
||||
found = False
|
||||
for f in finished:
|
||||
if all([s.labels[label] == f.labels[label] for label in remaining_labels]):
|
||||
assert (
|
||||
not found
|
||||
), "duplicate match, remaining_labels don't uniquely identify sample"
|
||||
assert not found, (
|
||||
"duplicate match, remaining_labels don't uniquely identify sample"
|
||||
)
|
||||
tl.append((s.labels, int(s.value) - int(f.value)))
|
||||
found = True
|
||||
if not found:
|
||||
|
||||
@@ -6,13 +6,14 @@ from typing import TYPE_CHECKING
|
||||
import allure
|
||||
import pytest
|
||||
import toml
|
||||
from _pytest.python import Metafunc
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from _pytest.python import Metafunc
|
||||
|
||||
|
||||
"""
|
||||
Dynamically parametrize tests by different parameters
|
||||
|
||||
@@ -6,7 +6,6 @@ import subprocess
|
||||
import threading
|
||||
from fcntl import LOCK_EX, LOCK_UN, flock
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
@@ -18,6 +17,7 @@ from fixtures.utils import allure_attach_from_dir
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
from types import TracebackType
|
||||
|
||||
|
||||
BASE_DIR = Path(__file__).parents[2]
|
||||
@@ -101,9 +101,9 @@ def compatibility_snapshot_dir() -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
return
|
||||
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
|
||||
assert (
|
||||
compatibility_snapshot_dir_env is not None
|
||||
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
assert compatibility_snapshot_dir_env is not None, (
|
||||
"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
)
|
||||
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
|
||||
yield compatibility_snapshot_dir
|
||||
|
||||
|
||||
@@ -7,22 +7,24 @@ import os
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import boto3
|
||||
import toml
|
||||
from moto.server import ThreadedMotoServer
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.common_types import IndexPartDump
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
|
||||
|
||||
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
|
||||
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"
|
||||
@@ -448,9 +450,9 @@ class RemoteStorageKind(StrEnum):
|
||||
env_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
|
||||
env_access_token = os.getenv("AWS_SESSION_TOKEN")
|
||||
env_profile = os.getenv("AWS_PROFILE")
|
||||
assert (
|
||||
env_access_key and env_secret_key and env_access_token
|
||||
) or env_profile, "need to specify either access key and secret access key or profile"
|
||||
assert (env_access_key and env_secret_key and env_access_token) or env_profile, (
|
||||
"need to specify either access key and secret access key or profile"
|
||||
)
|
||||
|
||||
bucket_name = bucket_name or os.getenv("REMOTE_STORAGE_S3_BUCKET")
|
||||
assert bucket_name is not None, "no remote storage bucket name provided"
|
||||
|
||||
@@ -3,12 +3,11 @@ from __future__ import annotations
|
||||
from collections.abc import MutableMapping
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import pytest
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import MutableMapping
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
|
||||
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
|
||||
|
||||
def wait_walreceivers_absent(
|
||||
sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
||||
|
||||
@@ -3,12 +3,13 @@ from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
|
||||
|
||||
"""
|
||||
This plugin allows tests to be marked as slow using pytest.mark.slow. By default slow
|
||||
|
||||
@@ -5,9 +5,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.datastructures import Headers
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -15,6 +13,9 @@ from fixtures.log_helper import log
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
|
||||
|
||||
class StorageControllerProxy:
|
||||
def __init__(self, server: HTTPServer):
|
||||
|
||||
@@ -19,7 +19,6 @@ from urllib.parse import urlencode
|
||||
import allure
|
||||
import pytest
|
||||
import zstandard
|
||||
from psycopg2.extensions import cursor
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.common_types import Id, Lsn
|
||||
@@ -34,6 +33,8 @@ if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from typing import IO
|
||||
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.common_types import TimelineId
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
|
||||
@@ -512,7 +513,9 @@ def assert_no_errors(log_file: Path, service: str, allowed_errors: list[str]):
|
||||
for _lineno, error in errors:
|
||||
log.info(f"not allowed {service} error: {error.strip()}")
|
||||
|
||||
assert not errors, f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
|
||||
assert not errors, (
|
||||
f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
|
||||
)
|
||||
|
||||
|
||||
def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str]):
|
||||
@@ -550,18 +553,18 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
|
||||
|
||||
left_list, right_list = map(build_hash_list, [left, right])
|
||||
|
||||
assert len(left_list) == len(
|
||||
right_list
|
||||
), f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
|
||||
assert len(left_list) == len(right_list), (
|
||||
f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
|
||||
)
|
||||
|
||||
mismatching: set[str] = set()
|
||||
|
||||
for left_tuple, right_tuple in zip(left_list, right_list, strict=False):
|
||||
left_path, left_hash = left_tuple
|
||||
right_path, right_hash = right_tuple
|
||||
assert (
|
||||
left_path == right_path
|
||||
), f"file count matched, expected these to be same paths: {left_path}, {right_path}"
|
||||
assert left_path == right_path, (
|
||||
f"file count matched, expected these to be same paths: {left_path}, {right_path}"
|
||||
)
|
||||
if left_hash != right_hash:
|
||||
mismatching.add(left_path)
|
||||
|
||||
@@ -721,3 +724,20 @@ def skip_on_ci(reason: str):
|
||||
os.getenv("CI", "false") == "true",
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
def shared_buffers_for_max_cu(max_cu: float) -> str:
|
||||
"""
|
||||
Returns the string value of shared_buffers for the given max CU.
|
||||
Use shared_buffers size like in production for max CU compute.
|
||||
See https://github.com/neondatabase/cloud/blob/877e33b4289a471b8f0a35c84009846358f3e5a3/goapp/controlplane/internal/pkg/compute/computespec/pg_settings.go#L405
|
||||
|
||||
e.g. // 2 CU: 225mb; 4 CU: 450mb; 8 CU: 900mb
|
||||
"""
|
||||
ramBytes = int(4096 * max_cu * 1024 * 1024)
|
||||
maxConnections = max(100, min(int(ramBytes / 9531392), 4000))
|
||||
maxWorkerProcesses = 12 + int(max_cu * 2)
|
||||
maxBackends = 1 + maxConnections + maxWorkerProcesses
|
||||
sharedBuffersMb = int(max(128, (1023 + maxBackends * 256) / 1024))
|
||||
sharedBuffers = int(sharedBuffersMb * 1024 / 8)
|
||||
return str(sharedBuffers)
|
||||
|
||||
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
@@ -17,6 +16,8 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
|
||||
@@ -7,14 +7,17 @@ from __future__ import annotations
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import clickhouse_connect
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
|
||||
|
||||
def query_clickhouse(
|
||||
client,
|
||||
|
||||
@@ -7,14 +7,17 @@ from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
|
||||
|
||||
class DebeziumAPI:
|
||||
"""
|
||||
|
||||
@@ -7,18 +7,19 @@ from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import fixtures.pageserver.many_tenants as many_tenants
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_until_all_tenants_state
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
|
||||
|
||||
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
"""
|
||||
|
||||
@@ -7,16 +7,19 @@ import threading
|
||||
import time
|
||||
import timeit
|
||||
from contextlib import closing
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.utils import wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
def _record_branch_creation_durations(neon_compare: NeonCompare, durs: list[float]):
|
||||
@@ -45,9 +48,9 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
tenant, _ = env.create_tenant(
|
||||
conf={
|
||||
"gc_period": "5 s",
|
||||
"gc_horizon": f"{4 * 1024 ** 2}",
|
||||
"checkpoint_distance": f"{2 * 1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"gc_horizon": f"{4 * 1024**2}",
|
||||
"checkpoint_distance": f"{2 * 1024**2}",
|
||||
"compaction_target_size": f"{1024**2}",
|
||||
"compaction_threshold": "2",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "5 s",
|
||||
@@ -82,10 +85,10 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
env.create_branch(f"b{i + 1}", ancestor_branch_name=f"b{p}", tenant_id=tenant)
|
||||
dur = timeit.default_timer() - timer
|
||||
|
||||
log.info(f"Creating branch b{i+1} took {dur}s")
|
||||
log.info(f"Creating branch b{i + 1} took {dur}s")
|
||||
branch_creation_durations.append(dur)
|
||||
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(f"b{i+1}",), daemon=True))
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(f"b{i + 1}",), daemon=True))
|
||||
threads[-1].start()
|
||||
|
||||
for thread in threads:
|
||||
|
||||
@@ -2,13 +2,16 @@ from __future__ import annotations
|
||||
|
||||
import timeit
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.benchmark_fixture import PgBenchRunResult
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.neon_fixtures import fork_at_current_lsn
|
||||
|
||||
from performance.test_perf_pgbench import utc_now_timestamp
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Start of `test_compare_child_and_root_*` tests
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import timeit
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
# Run bulk tenant creation test.
|
||||
#
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.utils import shared_buffers_for_max_cu
|
||||
|
||||
|
||||
#
|
||||
@@ -20,7 +21,10 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
|
||||
|
||||
timeline_id = env.create_branch("test_bulk_update")
|
||||
tenant_id = env.initial_tenant
|
||||
endpoint = env.endpoints.create_start("test_bulk_update")
|
||||
# use shared_buffers size like in production for 8 CU compute
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_bulk_update", config_lines=[f"shared_buffers={shared_buffers_for_max_cu(8.0)}"]
|
||||
)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("set statement_timeout=0")
|
||||
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import closing
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import wait_for_last_flush_lsn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
|
||||
|
||||
#
|
||||
# Test compaction and image layer creation performance.
|
||||
|
||||
@@ -3,13 +3,16 @@ from __future__ import annotations
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
|
||||
def get_seeds_matrix(default: int = 100):
|
||||
seeds = os.getenv("TEST_PG_BENCH_SEEDS_MATRIX", default=str(default))
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@pytest.mark.timeout(120)
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
|
||||
@@ -2,11 +2,13 @@ from __future__ import annotations
|
||||
|
||||
from contextlib import closing
|
||||
from io import BufferedReader, RawIOBase
|
||||
from typing import final
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
|
||||
@final
|
||||
class CopyTestData(RawIOBase):
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user