Compare commits

..

2 Commits

Author SHA1 Message Date
Erik Grinaker
d492b1eec3 Fix doc comment typo 2025-04-28 15:19:05 +02:00
John Spray
600dca9e2e pageserver: add changed_bytes_from_parent consumption metric 2025-04-28 14:44:16 +02:00
63 changed files with 476 additions and 1232 deletions

View File

@@ -7,7 +7,7 @@ inputs:
type: boolean
required: false
default: false
aws-oidc-role-arn:
aws-oicd-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -88,7 +88,7 @@ runs:
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this

View File

@@ -8,7 +8,7 @@ inputs:
unique-key:
description: 'string to distinguish different results in the same run'
required: true
aws-oidc-role-arn:
aws-oicd-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -39,7 +39,7 @@ runs:
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
- name: Upload test results

View File

@@ -15,7 +15,7 @@ inputs:
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
aws-oidc-role-arn:
aws-oicd-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -25,7 +25,7 @@ runs:
- uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-duration-seconds: 3600
- name: Download artifact

View File

@@ -53,7 +53,7 @@ inputs:
description: 'benchmark durations JSON'
required: false
default: '{}'
aws-oidc-role-arn:
aws-oicd-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -66,7 +66,7 @@ runs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- name: Download Neon binaries for the previous release
if: inputs.build_type != 'remote'
@@ -75,7 +75,7 @@ runs:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}-artifact
path: /tmp/neon-previous
prefix: latest
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- name: Download compatibility snapshot
if: inputs.build_type != 'remote'
@@ -87,7 +87,7 @@ runs:
# The lack of compatibility snapshot (for example, for the new Postgres version)
# shouldn't fail the whole job. Only relevant test should fail.
skip-if-does-not-exist: true
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- name: Checkout
if: inputs.needs_postgres_source == 'true'
@@ -228,13 +228,13 @@ runs:
# The lack of compatibility snapshot shouldn't fail the job
# (for example if we didn't run the test for non build-and-test workflow)
skip-if-does-not-exist: true
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- uses: aws-actions/configure-aws-credentials@v4
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
- name: Upload test results
@@ -243,4 +243,4 @@ runs:
with:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}-${{ runner.arch }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}

View File

@@ -14,11 +14,11 @@ runs:
name: coverage-data-artifact
path: /tmp/coverage
skip-if-does-not-exist: true # skip if there's no previous coverage to download
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- name: Upload coverage data
uses: ./.github/actions/upload
with:
name: coverage-data-artifact
path: /tmp/coverage
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}

View File

@@ -14,7 +14,7 @@ inputs:
prefix:
description: "S3 prefix. Default is '${GITHUB_SHA}/${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
aws-oidc-role-arn:
aws-oicd-role-arn:
description: "the OIDC role arn for aws auth"
required: false
default: ""
@@ -61,7 +61,7 @@ runs:
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-duration-seconds: 3600
- name: Upload artifact

View File

@@ -41,7 +41,7 @@ echo "Merge base of ${MAIN_BRANCH} and ${RELEASE_BRANCH}: ${MERGE_BASE}"
LAST_COMMIT=$(git rev-parse HEAD)
MERGE_COMMIT_MESSAGE=$(git log -1 --format=%s "${LAST_COMMIT}")
EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2} UTC$"
EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2}$"
if ! [[ "${MERGE_COMMIT_MESSAGE}" =~ ${EXPECTED_MESSAGE_REGEX} ]]; then
report_error "Merge commit message does not match expected pattern: '<component> release YYYY-MM-DD'

View File

@@ -81,7 +81,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# we create a table that has one row for each database that we want to restore with the status whether the restore is done
- name: Create benchmark_restore_status table if it does not exist

View File

@@ -323,7 +323,7 @@ jobs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Check diesel schema
if: inputs.build-type == 'release' && inputs.arch == 'x64'
@@ -394,7 +394,7 @@ jobs:
rerun_failed: ${{ inputs.test-run-count == 1 }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.

103
.github/workflows/_create-release-pr.yml vendored Normal file
View File

@@ -0,0 +1,103 @@
name: Create Release PR
on:
workflow_call:
inputs:
component-name:
description: 'Component name'
required: true
type: string
source-branch:
description: 'Source branch'
required: true
type: string
secrets:
ci-access-token:
description: 'CI access token'
required: true
defaults:
run:
shell: bash -euo pipefail {0}
permissions:
contents: read
jobs:
create-release-branch:
runs-on: ubuntu-22.04
permissions:
contents: write # for `git push`
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
ref: ${{ inputs.source-branch }}
fetch-depth: 0
- name: Set variables
id: vars
env:
COMPONENT_NAME: ${{ inputs.component-name }}
RELEASE_BRANCH: >-
${{
false
|| inputs.component-name == 'Storage' && 'release'
|| inputs.component-name == 'Proxy' && 'release-proxy'
|| inputs.component-name == 'Compute' && 'release-compute'
}}
run: |
now_date=$(date -u +'%Y-%m-%d')
now_time=$(date -u +'%H-%M-%Z')
{
echo "title=${COMPONENT_NAME} release ${now_date}"
echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}"
echo "release-branch=${RELEASE_BRANCH}"
} | tee -a ${GITHUB_OUTPUT}
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
- name: Create RC branch
env:
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
git switch -c "${RC_BRANCH}"
# Manually create a merge commit on the current branch, keeping the
# tree and setting the parents to the current HEAD and the HEAD of the
# release branch. This commit is what we'll fast-forward the release
# branch to when merging the release branch.
# For details on why, look at
# https://docs.neon.build/overview/repositories/neon.html#background-on-commit-history-of-release-prs
current_tree=$(git rev-parse 'HEAD^{tree}')
release_head=$(git rev-parse "origin/${RELEASE_BRANCH}")
current_head=$(git rev-parse HEAD)
merge_commit=$(git commit-tree -p "${current_head}" -p "${release_head}" -m "${TITLE}" "${current_tree}")
# Fast-forward the current branch to the newly created merge_commit
git merge --ff-only ${merge_commit}
git push origin "${RC_BRANCH}"
- name: Create a PR into ${{ steps.vars.outputs.release-branch }}
env:
GH_TOKEN: ${{ secrets.ci-access-token }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
gh pr create --title "${TITLE}" \
--body "" \
--head "${RC_BRANCH}" \
--base "${RELEASE_BRANCH}"

View File

@@ -114,7 +114,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -132,7 +132,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
@@ -165,7 +165,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -222,8 +222,8 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
@@ -233,7 +233,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -282,7 +282,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run Logical Replication benchmarks
uses: ./.github/actions/run-python-test-set
@@ -293,7 +293,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -310,7 +310,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -322,7 +322,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -505,7 +505,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: contains(fromJSON('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
@@ -557,7 +557,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -573,7 +573,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -588,7 +588,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -603,7 +603,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -621,7 +621,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -694,7 +694,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -726,7 +726,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -741,7 +741,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -752,7 +752,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -828,7 +828,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -871,7 +871,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 43200 -k test_clickbench
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -885,7 +885,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -954,7 +954,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Get Connstring Secret Name
run: |
@@ -1003,7 +1003,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_tpch
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1015,7 +1015,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -1078,7 +1078,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -1121,7 +1121,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_user_examples
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1132,7 +1132,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -93,7 +93,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }}

View File

@@ -69,7 +69,7 @@ jobs:
submodules: true
- name: Check for file changes
uses: step-security/paths-filter@v3
uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2
id: files-changed
with:
token: ${{ secrets.GITHUB_TOKEN }}
@@ -317,7 +317,7 @@ jobs:
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
pg_version: v16
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -384,7 +384,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -451,14 +451,14 @@ jobs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ matrix.build_type }}-artifact
path: /tmp/neon
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Get coverage artifact
uses: ./.github/actions/download
with:
name: coverage-data-artifact
path: /tmp/coverage
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge

View File

@@ -117,7 +117,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -68,7 +68,7 @@ jobs:
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ inputs.region_id || 'aws-us-east-2' }}
region_id: ${{ inputs.region_id }}
postgres_version: ${{ matrix.pg-version }}
project_settings: ${{ steps.project-settings.outputs.settings }}
# We need these settings to get the expected output results.

View File

@@ -89,7 +89,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create a new branch
id: create-branch
@@ -105,7 +105,7 @@ jobs:
test_selection: cloud_regress
pg_version: ${{matrix.pg-version}}
extra_params: -m remote_cluster
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
@@ -122,7 +122,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -32,7 +32,7 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target_project: new_empty_project_stripe_size_2048
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
disable_sharding: false
@@ -98,7 +98,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
@@ -110,10 +110,10 @@ jobs:
compute_units: '[7, 7]' # we want to test large compute here to avoid compute-side bottleneck
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
shard_split_project: ${{ matrix.stripe_size != null && 'true' || 'false' }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
shard_count: 8
stripe_size: ${{ matrix.stripe_size }}
disable_sharding: ${{ matrix.disable_sharding }}
disable_sharding: ${{ matrix.disable_sharding }}
- name: Initialize Neon project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
@@ -171,7 +171,7 @@ jobs:
extra_params: -s -m remote_cluster --timeout 86400 -k test_ingest_performance_using_pgcopydb
pg_version: v${{ matrix.postgres_version }}
save_perf_report: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_INGEST_SOURCE_CONNSTR: ${{ secrets.BENCHMARK_INGEST_SOURCE_CONNSTR }}
TARGET_PROJECT_TYPE: ${{ matrix.target_project }}

View File

@@ -33,9 +33,9 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target: new_branch
- target: new_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
- target: reuse_branch
- target: reuse_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
@@ -43,7 +43,7 @@ jobs:
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "1h" # todo update to > 1 h
TEST_PG_BENCH_DURATIONS_MATRIX: "1h" # todo update to > 1 h
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ matrix.custom_scripts }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: 16 # pre-determined by pre-determined project
@@ -85,7 +85,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Branch for large tenant
if: ${{ matrix.target == 'new_branch' }}
@@ -129,7 +129,7 @@ jobs:
${PSQL} "${BENCHMARK_CONNSTR}" -c "SET statement_timeout = 0; DELETE FROM webhook.incoming_webhooks WHERE created_at > '2025-02-27 23:59:59+00';"
echo "$(date '+%Y-%m-%d %H:%M:%S') - Finished deleting rows in table webhook.incoming_webhooks from prior runs"
- name: Benchmark pgbench with custom-scripts
- name: Benchmark pgbench with custom-scripts
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
@@ -138,7 +138,7 @@ jobs:
save_perf_report: true
extra_params: -m remote_cluster --timeout 7200 -k test_perf_oltp_large_tenant_pgbench
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -153,7 +153,7 @@ jobs:
save_perf_report: true
extra_params: -m remote_cluster --timeout 172800 -k test_perf_oltp_large_tenant_maintenance
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr_without_pooler }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -179,8 +179,8 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1

View File

@@ -53,7 +53,7 @@ jobs:
submodules: true
- name: Check for Postgres changes
uses: step-security/paths-filter@v3
uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3
id: files_changed
with:
token: ${{ github.token }}

View File

@@ -147,7 +147,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -103,7 +103,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -122,7 +122,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
@@ -139,7 +139,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -178,7 +178,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -195,7 +195,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
@@ -212,7 +212,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -66,7 +66,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
@@ -76,7 +76,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ matrix.pg-version }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
RANDOM_SEED: ${{ inputs.random_seed }}
@@ -88,6 +88,6 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -1,12 +0,0 @@
name: Create compute release PR
on:
schedule:
- cron: '0 7 * * FRI'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: compute
secrets: inherit

View File

@@ -1,12 +0,0 @@
name: Create proxy release PR
on:
schedule:
- cron: '0 6 * * TUE'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: proxy
secrets: inherit

View File

@@ -1,12 +0,0 @@
name: Create storage release PR
on:
schedule:
- cron: '0 6 * * FRI'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: storage
secrets: inherit

View File

@@ -1,34 +1,25 @@
name: Create release PR
name: Create Release Branch
on:
schedule:
# It should be kept in sync with if-condition in jobs
- cron: '0 6 * * TUE' # Proxy release
- cron: '0 6 * * FRI' # Storage release
- cron: '0 7 * * FRI' # Compute release
workflow_dispatch:
inputs:
component:
description: "Component to release"
required: true
type: choice
options:
- compute
- proxy
- storage
cherry-pick:
description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)"
create-storage-release-branch:
type: boolean
description: 'Create Storage release PR'
required: false
type: string
default: ''
workflow_call:
inputs:
component:
description: "Component to release"
required: true
type: string
cherry-pick:
description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)"
create-proxy-release-branch:
type: boolean
description: 'Create Proxy release PR'
required: false
create-compute-release-branch:
type: boolean
description: 'Create Compute release PR'
required: false
type: string
default: ''
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -38,31 +29,41 @@ defaults:
shell: bash -euo pipefail {0}
jobs:
create-release-pr:
runs-on: ubuntu-22.04
create-storage-release-branch:
if: ${{ github.event.schedule == '0 6 * * FRI' || inputs.create-storage-release-branch }}
permissions:
contents: write
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Storage'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
create-proxy-release-branch:
if: ${{ github.event.schedule == '0 6 * * TUE' || inputs.create-proxy-release-branch }}
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
permissions:
contents: write
- name: Create release PR
uses: neondatabase/dev-actions/release-pr@290dec821d86fa8a93f019e8c69720f5865b5677
with:
component: ${{ inputs.component }}
cherry-pick: ${{ inputs.cherry-pick }}
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Proxy'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
create-compute-release-branch:
if: ${{ github.event.schedule == '0 7 * * FRI' || inputs.create-compute-release-branch }}
permissions:
contents: write
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Compute'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}

View File

@@ -1305,8 +1305,8 @@ ARG PG_VERSION
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
WORKDIR /ext-src
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/heads/alexk/tmp-disable-audit.tar.gz -O pg_session_jwt.tar.gz && \
echo "b868227950973df5186af54dfa03617536d21e96dddcb7b25e7ce199b4956bdb pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -112,7 +112,7 @@ impl SafekeeperNode {
}
/// Initializes a safekeeper node by creating all necessary files,
/// e.g. SSL certificates and JWT token file.
/// e.g. SSL certificates.
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
@@ -120,17 +120,6 @@ impl SafekeeperNode {
&self.datadir_path().join("server.key"),
)?;
}
// Generate a token file for authentication with other safekeepers
if self.conf.auth_enabled {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
let token_path = self.datadir_path().join("peer_jwt_token");
std::fs::write(token_path, token)?;
}
Ok(())
}
@@ -229,26 +218,14 @@ impl SafekeeperNode {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
if self.conf.auth_enabled {
let token_path = self.datadir_path().join("peer_jwt_token");
let token_path_str = token_path
.to_str()
.with_context(|| {
format!("Token path {token_path:?} cannot be represented as a unicode string")
})?
.to_owned();
args.extend(["--auth-token-path".to_owned(), token_path_str]);
}
args.extend_from_slice(extra_opts);
let env_variables = Vec::new();
background_process::start_process(
&format!("safekeeper-{id}"),
&datadir,
&self.env.safekeeper_bin(),
&args,
env_variables,
self.safekeeper_env_variables()?,
background_process::InitialPidFile::Expect(self.pid_file()),
retry_timeout,
|| async {
@@ -262,6 +239,18 @@ impl SafekeeperNode {
.await
}
fn safekeeper_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
// Generate a token to connect from safekeeper to peers
if self.conf.auth_enabled {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)])
} else {
Ok(Vec::new())
}
}
///
/// Stop the server.
///

View File

@@ -3,5 +3,3 @@ pg_distrib_dir='/usr/local/'
listen_pg_addr='0.0.0.0:6400'
listen_http_addr='0.0.0.0:9898'
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
control_plane_emergency_mode=true

View File

@@ -77,9 +77,7 @@ impl StorageModel {
}
SizeResult {
// If total_size is 0, it means that the tenant has all timelines offloaded; we need to report 1
// here so that the data point shows up in the s3 files.
total_size: total_size.max(1),
total_size,
segments: segment_results,
}
}

View File

@@ -504,7 +504,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
conf,
);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());

View File

@@ -150,7 +150,7 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
pub control_plane_api: Url,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
@@ -438,8 +438,7 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api: control_plane_api
.ok_or_else(|| anyhow::anyhow!("`control_plane_api` must be set"))?,
control_plane_api,
control_plane_emergency_mode,
heatmap_upload_concurrency,
secondary_download_concurrency,
@@ -574,7 +573,6 @@ impl PageServerConf {
background_task_maximum_delay: Duration::ZERO,
load_previous_heatmap: Some(true),
generate_unarchival_heatmap: Some(true),
control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()),
..Default::default()
};
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
@@ -643,12 +641,9 @@ mod tests {
use super::PageServerConf;
#[test]
fn test_minimal_config_toml_is_valid() {
// The minimal valid config for running a pageserver:
// - control_plane_api is mandatory, as pageservers cannot run in isolation
// - we use Default impl of everything else in this situation
fn test_empty_config_toml_is_valid() {
// we use Default impl of everything in this situation
let input = r#"
control_plane_api = "http://localhost:6666"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("empty config is valid");

View File

@@ -27,6 +27,9 @@ pub(super) enum Name {
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
/// Timeline delta from parent (WAL bytes clamped to logical size)
#[serde(rename = "timeline_changed_bytes_from_parent")]
ChangedBytesFromParent,
/// Tenant remote size
#[serde(rename = "remote_storage_size")]
RemoteSize,
@@ -175,6 +178,24 @@ impl MetricsKey {
.absolute_values()
}
/// [`Timeline::get_last_record_lsn`] - [`Timeline::get_ancestor_lsn`], clamped to
/// [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
/// [`Timeline::get_ancestor_lsn`]: crate::tenant::Timeline::get_ancestor_lsn
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_changed_bytes_from_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::ChangedBytesFromParent,
}
.absolute_values()
}
/// [`TenantShard::remote_size`]
///
/// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
@@ -371,6 +392,7 @@ struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
changed_bytes_from_parent: Option<u64>,
}
impl TimelineSnapshot {
@@ -406,10 +428,22 @@ impl TimelineSnapshot {
}
};
// This is an approximation of how much data has changed on this branch vs. its
// ancestor: the number of bytes written to the WAL, clamped to the size of the branch.
let changed_bytes_from_parent = current_exact_logical_size.and_then(|size| {
if t.get_ancestor_lsn() == Lsn::MAX {
return None;
}
t.get_last_record_lsn()
.checked_sub(t.get_ancestor_lsn())
.map(|wal_bytes| wal_bytes.0.min(size))
});
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
changed_bytes_from_parent,
}))
}
}
@@ -487,6 +521,17 @@ impl TimelineSnapshot {
metrics.push(factory.at(now, size));
}
}
{
let factory = MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id);
let current_or_previous = self
.changed_bytes_from_parent
.or_else(|| cache.get(factory.key()).map(|item| item.value));
if let Some(size) = current_or_previous {
metrics.push(factory.at(now, size));
}
}
}
}

View File

@@ -18,6 +18,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
@@ -33,7 +34,8 @@ fn startup_collected_timeline_metrics_before_advancing() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
]
);
}
@@ -60,6 +62,7 @@ fn startup_collected_timeline_metrics_second_round() {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -69,7 +72,8 @@ fn startup_collected_timeline_metrics_second_round() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
]
);
}
@@ -104,6 +108,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -113,7 +118,8 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
]
);
}
@@ -141,6 +147,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
changed_bytes_from_parent: None,
};
let mut cache = HashMap::from([
@@ -202,6 +209,7 @@ fn post_restart_current_exact_logical_size_uses_cached() {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
changed_bytes_from_parent: Some(0x1000),
};
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)

View File

@@ -58,8 +58,14 @@ pub trait StorageControllerUpcallApi {
impl StorageControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
let mut url = conf.control_plane_api.clone();
pub fn new(
conf: &'static PageServerConf,
cancel: &CancellationToken,
) -> Result<Option<Self>, reqwest::Error> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return Ok(None),
};
if let Ok(mut segs) = url.path_segments_mut() {
// This ensures that `url` ends with a slash if it doesn't already.
@@ -79,17 +85,15 @@ impl StorageControllerUpcallClient {
}
for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(
Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
);
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
Self {
http_client: client.build().expect("Failed to construct HTTP client"),
Ok(Some(Self {
http_client: client.build()?,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
}
}))
}
#[tracing::instrument(skip_all)]

View File

@@ -585,7 +585,7 @@ impl DeletionQueue {
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
controller_upcall_client: C,
controller_upcall_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, DeletionQueueWorkers<C>)
where
@@ -701,7 +701,7 @@ mod test {
async fn restart(&mut self) {
let (deletion_queue, workers) = DeletionQueue::new(
self.storage.clone(),
self.mock_control_plane.clone(),
Some(self.mock_control_plane.clone()),
self.harness.conf,
);
@@ -821,8 +821,11 @@ mod test {
let mock_control_plane = MockStorageController::new();
let (deletion_queue, worker) =
DeletionQueue::new(storage.clone(), mock_control_plane.clone(), harness.conf);
let (deletion_queue, worker) = DeletionQueue::new(
storage.clone(),
Some(mock_control_plane.clone()),
harness.conf,
);
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());

View File

@@ -53,7 +53,7 @@ where
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
// Client for calling into control plane API for validation of deletes
controller_upcall_client: C,
controller_upcall_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -86,7 +86,7 @@ where
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
controller_upcall_client: C,
controller_upcall_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
@@ -137,16 +137,20 @@ where
return Ok(());
}
let tenants_valid = match self
.controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
match controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
}
}
} else {
// Control plane API disabled. In legacy mode we consider everything valid.
tenant_generations.keys().map(|k| (*k, true)).collect()
};
let mut validated_sequence: Option<u64> = None;

View File

@@ -1774,12 +1774,8 @@ static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|
.expect("failed to define a metric")
});
/// Per-timeline smgr histogram buckets should be the same as the compute buckets, such that the
/// metrics are comparable across compute and Pageserver. See also:
/// <https://github.com/neondatabase/neon/blob/1a87975d956a8ad17ec8b85da32a137ec4893fcc/pgxn/neon/neon_perf_counters.h#L18-L27>
/// <https://github.com/neondatabase/flux-fleet/blob/556182a939edda87ff1d85a6b02e5cec901e0e9e/apps/base/compute-metrics/scrape-compute-sql-exporter.yaml#L29-L35>
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] =
&[0.0006, 0.001, 0.003, 0.006, 0.01, 0.03, 0.1, 1.0, 3.0];
// Alias so all histograms recording per-timeline smgr timings use the same buckets.
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] = CRITICAL_OP_BUCKETS;
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(

View File

@@ -1084,17 +1084,8 @@ impl Timeline {
let mut result = HashMap::new();
for (k, v) in kv {
let v = v?;
if v.is_empty() {
// This is a tombstone -- we can skip it.
// Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
// the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
// we also need to consider that. Such tombstones might be written on the detach ancestor code path to
// avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
continue;
}
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v)
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
let origin_lsn = Lsn::des(&v).unwrap();
if origin_lsn != Lsn::INVALID {
result.insert(origin_id, origin_lsn);
}
@@ -2587,11 +2578,6 @@ impl DatadirModification<'_> {
}
}
#[cfg(test)]
pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)

View File

@@ -4254,7 +4254,9 @@ impl TenantShard {
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
);
let (state, mut rx) = watch::channel(state);
@@ -5947,9 +5949,7 @@ mod tests {
use itertools::Itertools;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
use pageserver_api::key::{
AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX, repl_origin_key,
};
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
@@ -8185,54 +8185,6 @@ mod tests {
assert_eq!(files.get("pg_logical/mappings/test2"), None);
}
#[tokio::test]
async fn test_repl_origin_tombstones() {
let harness = TenantHarness::create("test_repl_origin_tombstones")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let repl_lsn = Lsn(0x10);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(repl_origin_key(2), Value::Image(Bytes::new()));
modification.set_replorigin(1, repl_lsn).await.unwrap();
modification.commit(&ctx).await.unwrap();
}
// we can read everything from the storage
let repl_origins = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(repl_origins.len(), 1);
assert_eq!(repl_origins[&1], lsn);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(
repl_origin_key(3),
Value::Image(Bytes::copy_from_slice(b"cannot_decode_this")),
);
modification.commit(&ctx).await.unwrap();
}
let result = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation").await?;

View File

@@ -346,8 +346,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else {
let client = StorageControllerUpcallClient::new(conf, cancel);
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
@@ -361,6 +360,9 @@ async fn init_load_generations(
anyhow::bail!("Shut down while waiting for control plane re-attach response")
}
}
} else {
info!("Control plane API not configured, tenant generations are disabled");
return Ok(None);
};
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
@@ -1151,8 +1153,17 @@ impl TenantManager {
// Testing hack: if we are configured with no control plane, then drop the generation
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
let attached_conf = if cfg!(feature = "testing") {
let mut conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
if self.conf.control_plane_api.is_none() {
conf.location.generation = Generation::none();
}
conf
} else {
AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?
};
let tenant = tenant_spawn(
self.conf,

View File

@@ -178,7 +178,7 @@ impl Attempt {
}
}
pub(crate) async fn generate_tombstone_image_layer(
async fn generate_tombstone_image_layer(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,

View File

@@ -163,7 +163,8 @@ pub async fn doit(
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,

View File

@@ -36,8 +36,6 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -88,6 +88,9 @@ typedef PGAlignedBlock PGIOAlignedBlock;
page_server_api *page_server;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
@@ -785,27 +788,6 @@ prefetch_read(PrefetchRequest *slot)
}
}
/*
* Wait completion of previosly registered prefetch request.
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
communicator_prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
{
prefetch_set_unused(entry->slot->my_ring_index);
return true;
}
return false;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
@@ -924,6 +906,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -932,6 +915,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
slot->reqid = request.hdr.reqid;
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
@@ -949,7 +934,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
slot->reqid = request.hdr.reqid;
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -1953,6 +1937,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2227,6 +2212,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
NeonNblocksRequest request = {
.hdr.tag = T_NeonNblocksRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2299,6 +2285,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
NeonDbSizeRequest request = {
.hdr.tag = T_NeonDbSizeRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.dbNode = dbNode,
@@ -2366,6 +2353,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
request = (NeonGetSlruSegmentRequest) {
.hdr.tag = T_NeonGetSlruSegmentRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.kind = kind,

View File

@@ -37,8 +37,6 @@ extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern bool communicator_prefetch_receive(BufferTag tag);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);

View File

@@ -25,7 +25,6 @@
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
#include "storage/fd.h"
@@ -33,8 +32,6 @@
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
@@ -49,8 +46,6 @@
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "communicator.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -92,13 +87,14 @@
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define MAX_BLOCKS_PER_CHUNK_LOG 7 /* 1Mb chunk */
#define MAX_BLOCKS_PER_CHUNK (1 << MAX_BLOCKS_PER_CHUNK_LOG)
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
/*
* Blocks are read or written to LFC file outside LFC critical section.
@@ -123,26 +119,16 @@ typedef struct FileCacheEntry
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */
dlist_node list_node; /* LRU/holes list node */
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
} FileCacheEntry;
#define FILE_CACHE_ENRTY_SIZE MAXALIGN(offsetof(FileCacheEntry, state) + (lfc_blocks_per_chunk*2+31)/32*4)
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 prewarmed_pages;
uint32 skipped_pages;
TimestampTz completed;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -150,7 +136,6 @@ typedef struct FileCacheControl
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 used_pages; /* number of used pages */
uint32 pinned; /* number of pinned chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
@@ -164,43 +149,23 @@ typedef struct FileCacheControl
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
size_t total_prewarm_pages;
size_t prewarm_batch;
bool prewarm_active;
bool prewarm_canceled;
dsm_handle prewarm_lfc_state_handle;
} FileCacheControl;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
bool lfc_store_prefetch_result;
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -241,9 +206,7 @@ lfc_switch_off(void)
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->pinned = 0;
lfc_ctl->used = 0;
lfc_ctl->used_pages = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -333,7 +296,7 @@ lfc_shmem_startup(void)
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
info.entrysize = FILE_CACHE_ENRTY_SIZE;
info.entrysize = sizeof(FileCacheEntry);
/*
* n_chunks+1 because we add new element to hash table before eviction
@@ -379,7 +342,7 @@ lfc_shmem_request(void)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)));
RequestNamedLWLockTranche("lfc_lock", 1);
}
@@ -396,24 +359,6 @@ is_normal_backend(void)
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_chunk_size(int *newval, void **extra, GucSource source)
{
if (*newval & (*newval - 1))
{
elog(ERROR, "LFC chunk size should be power of two");
return false;
}
return true;
}
static void
lfc_change_chunk_size(int newval, void* extra)
{
lfc_chunk_size_log = pg_ceil_log2_32(newval);
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -470,11 +415,11 @@ lfc_change_limit_hook(int newval, void *extra)
CriticalAssert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * lfc_blocks_per_chunk * BLCKSZ, lfc_blocks_per_chunk * BLCKSZ) < 0)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
for (int i = 0; i < lfc_blocks_per_chunk; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -526,17 +471,6 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.prewarm_update_ws_estimation",
"Consider prewarmed pages for working set estimation",
NULL,
&lfc_prewarm_update_ws_estimation,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -574,45 +508,6 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_chunk_size",
"LFC chunk size in blocks (should be power of two)",
NULL,
&lfc_blocks_per_chunk,
MAX_BLOCKS_PER_CHUNK,
1,
MAX_BLOCKS_PER_CHUNK,
PGC_POSTMASTER,
GUC_UNIT_BLOCKS,
lfc_check_chunk_size,
lfc_change_chunk_size,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed chunks",
NULL,
&lfc_prewarm_limit,
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -626,317 +521,6 @@ lfc_init(void)
#endif
}
FileCacheState*
lfc_get_state(size_t max_entries)
{
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_pages = 0;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
fcs = (FileCacheState*)palloc0(state_size);
SET_VARSIZE(fcs, state_size);
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
break;
}
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
return fcs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
void
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
{
size_t fcs_chunk_size_log;
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
if (!lfc_ensure_opened())
return;
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (n_workers > MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
}
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_size = VARSIZE(fcs);
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
{
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (lfc_ctl->prewarm_active)
{
LWLockRelease(lfc_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
lfc_ctl->n_prewarm_entries = n_entries;
lfc_ctl->n_prewarm_workers = n_workers;
lfc_ctl->prewarm_active = true;
lfc_ctl->prewarm_canceled = false;
lfc_ctl->prewarm_batch = prewarm_batch;
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
LWLockRelease(lfc_lock);
/* Calculate total number of pages to be prewarmed */
lfc_ctl->total_prewarm_pages = fcs->n_pages;
seg = dsm_create(fcs_size, 0);
memcpy(dsm_segment_address(seg), fcs, fcs_size);
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
/* Spawn background workers */
for (uint32 i = 0; i < n_workers; i++)
{
BackgroundWorker worker = {0};
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_library_name, "neon");
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1);
strcpy(worker.bgw_type, "LFC prewarm worker");
worker.bgw_main_arg = Int32GetDatum(i);
/* must set notify PID to wait for shutdown */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i]))
{
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("LFC: registering dynamic bgworker prewarm failed"),
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
n_workers = i;
lfc_ctl->prewarm_canceled = true;
break;
}
}
for (uint32 i = 0; i < n_workers; i++)
{
bool interrupted;
do
{
interrupted = false;
PG_TRY();
{
BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]);
if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED)
{
elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status);
}
}
PG_CATCH();
{
elog(LOG, "LFC: cancel prewarm");
lfc_ctl->prewarm_canceled = true;
interrupted = true;
}
PG_END_TRY();
} while (interrupted);
if (!lfc_ctl->prewarm_workers[i].completed)
{
/* Background worker doesn't set completion time: it means that it was abnormally terminated */
elog(LOG, "LFC: prewarm worker %d failed", i+1);
/* Set completion time to prevent get_prewarm_info from considering this worker as active */
lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp();
}
}
dsm_detach(seg);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_ctl->prewarm_active = false;
LWLockRelease(lfc_lock);
}
void
lfc_prewarm_main(Datum main_arg)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t max_prefetch_pages;
size_t prewarm_batch;
size_t n_workers;
dsm_segment *seg;
FileCacheState* fcs;
uint8* bitmap;
BufferTag tag;
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment")));
fcs = (FileCacheState*) dsm_segment_address(seg);
prewarm_batch = lfc_ctl->prewarm_batch;
fcs_chunk_size_log = fcs->chunk_size_log;
n_workers = lfc_ctl->n_prewarm_workers;
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
ws = &lfc_ctl->prewarm_workers[worker_id];
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
/* enable prefetch in LFC */
lfc_store_prefetch_result = true;
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
elog(LOG, "LFC: worker %d start prewarming", worker_id);
while (!lfc_ctl->prewarm_canceled)
{
if (snd_idx < max_prefetch_pages)
{
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
else
{
ws->skipped_pages += 1;
BITMAP_CLR(bitmap, snd_idx);
}
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
{
if (n_received == n_sent && snd_idx == max_prefetch_pages)
{
break;
}
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
n_received += 1;
}
}
/* No need to perform prefetch cleanup here because prewarm worker will be terminated and
* connection to PS dropped just after return from this function.
*/
Assert(n_sent == n_received || lfc_ctl->prewarm_canceled);
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -946,7 +530,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry *entry;
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
bool found = false;
uint32 hash;
@@ -955,7 +539,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno - chunk_offs;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
@@ -993,9 +577,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
tag.blockNum = blkno - chunk_offs;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -1006,12 +590,12 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
while (true)
{
int this_chunk = Min(nblocks - i, lfc_blocks_per_chunk - chunk_offs);
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (; chunk_offs < lfc_blocks_per_chunk && i < nblocks; chunk_offs++, i++)
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
if (GET_STATE(entry, chunk_offs) != UNAVAILABLE)
{
@@ -1035,9 +619,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* Prepare for the next iteration. We don't unlock here, as that'd
* probably be more expensive than the gains it'd get us.
*/
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i);
tag.blockNum = (blkno + i) - chunk_offs;
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
@@ -1112,9 +696,9 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
uint8 chunk_mask[MAX_BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -1202,10 +786,8 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -1254,7 +836,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk;
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
@@ -1302,10 +884,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
}
else
{
@@ -1375,17 +954,14 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*
* While prewarming LFC we do not want to replace existed entries,
* so we just stop prewarm is LFC cache is full.
*/
else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm)
else if (!dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < lfc_blocks_per_chunk; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -1403,15 +979,14 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
return false;
}
entry->access_count = 1;
entry->hash = hash;
lfc_ctl->pinned += 1;
for (int i = 0; i < lfc_blocks_per_chunk; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
SET_STATE(entry, i, UNAVAILABLE);
return true;
@@ -1456,7 +1031,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
FileCacheBlockState state;
XLogRecPtr lwlsn;
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1466,7 +1041,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno - chunk_offs;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1477,7 +1052,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
@@ -1490,11 +1065,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1508,10 +1081,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1536,7 +1106,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1562,10 +1132,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
state = GET_STATE(entry, chunk_offs);
if (state == REQUESTED) {
@@ -1632,8 +1199,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
instr_time io_start, io_end;
ConditionVariable* cv;
@@ -1645,7 +1212,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
iov[i].iov_len = BLCKSZ;
}
tag.blockNum = blkno - chunk_offs;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1665,10 +1232,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1721,7 +1285,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1748,10 +1312,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
for (int i = 0; i < blocks_in_chunk; i++)
{
@@ -1877,12 +1438,7 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
break;
case 8:
key = "file_cache_chunk_size_pages";
value = lfc_blocks_per_chunk;
break;
case 9:
key = "file_cache_chunks_pinned";
if (lfc_ctl)
value = lfc_ctl->pinned;
value = BLOCKS_PER_CHUNK;
break;
default:
SRF_RETURN_DONE(funcctx);
@@ -2010,7 +1566,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < lfc_blocks_per_chunk; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
@@ -2038,13 +1594,13 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < lfc_blocks_per_chunk; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i;
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
@@ -2128,82 +1684,3 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheState* fcs = lfc_get_state(max_entries);
if (fcs != NULL)
PG_RETURN_BYTEA_P((bytea*)fcs);
else
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_workers = PG_GETARG_INT32(1);
FileCacheState* fcs = (FileCacheState*)state;
lfc_prewarm(fcs, n_workers);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
uint32 prewarmed_pages = 0;
uint32 skipped_pages = 0;
uint32 active_workers = 0;
uint32 total_pages;
size_t n_workers;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
LWLockAcquire(lfc_lock, LW_SHARED);
if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0)
{
LWLockRelease(lfc_lock);
PG_RETURN_NULL();
}
n_workers = lfc_ctl->n_prewarm_workers;
total_pages = lfc_ctl->total_prewarm_pages;
for (size_t i = 0; i < n_workers; i++)
{
PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i];
prewarmed_pages += ws->prewarmed_pages;
skipped_pages += ws->skipped_pages;
active_workers += ws->completed != 0;
}
LWLockRelease(lfc_lock);
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(total_pages);
values[1] = Int32GetDatum(prewarmed_pages);
values[2] = Int32GetDatum(skipped_pages);
values[3] = Int32GetDatum(active_workers);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -13,17 +13,6 @@
#include "neon_pgversioncompat.h"
typedef struct FileCacheState
{
int32 vl_len_; /* varlena header (do not touch directly!) */
uint32 magic;
uint32 n_chunks;
uint32 n_pages;
uint16 chunk_size_log;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
/* GUCs */
extern bool lfc_store_prefetch_result;
@@ -43,10 +32,7 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -48,6 +48,7 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
@@ -166,9 +167,6 @@ typedef struct
WaitEventSet *wes_read;
} PageServer;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
@@ -736,8 +734,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
default:
neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state);
}
pg_unreachable();
/* This shouldn't be hit */
Assert(false);
}
static void
@@ -996,7 +994,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
pageserver_conn = NULL;
}
request->reqid = GENERATE_REQUEST_ID();
req_buff = nm_pack_request(request);
/*

View File

@@ -1,22 +0,0 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -1,7 +0,0 @@
DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);

View File

@@ -396,7 +396,7 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
@@ -505,5 +505,4 @@ neon_set_lwlsn_db(XLogRecPtr lsn)
{
NRelFileInfo dummyNode = {InvalidOid, InvalidOid, InvalidOid};
return neon_set_lwlsn_block(lsn, dummyNode, MAIN_FORKNUM, 0);
}
}

View File

@@ -65,6 +65,7 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*--
* supertype of all the Neon*Request structs below.
*
@@ -128,7 +129,6 @@ typedef struct
int segno;
} NeonGetSlruSegmentRequest;
/* supertype of all the Neon*Response structs below */
typedef NeonMessage NeonResponse;
@@ -187,7 +187,6 @@ typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
* This function assigns request_id to the request which can be extracted by caller from request struct.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*

View File

@@ -7,14 +7,13 @@ use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, anyhow, bail, ensure};
use clap::Arg;
use futures::TryFutureExt;
use futures::future::Either;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use rustls::crypto::ring;
use rustls::pki_types::{DnsName, PrivateKeyDer};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use rustls::pki_types::PrivateKeyDer;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio_rustls::TlsConnector;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info};
use utils::project_git_version;
@@ -39,12 +38,6 @@ fn cli() -> clap::Command {
.help("listen for incoming client connections on ip:port")
.default_value("127.0.0.1:4432"),
)
.arg(
Arg::new("listen-tls")
.long("listen-tls")
.help("listen for incoming client connections on ip:port, requiring TLS to compute")
.default_value("127.0.0.1:4433"),
)
.arg(
Arg::new("tls-key")
.short('k')
@@ -129,58 +122,31 @@ pub async fn run() -> anyhow::Result<()> {
_ => bail!("tls-key and tls-cert must be specified"),
};
let compute_tls_config =
Arc::new(crate::tls::client_config::compute_client_config_with_root_certs()?);
// Start listening for incoming client connections
let proxy_address: SocketAddr = args
.get_one::<String>("listen")
.expect("listen argument defined")
.expect("string argument defined")
.parse()?;
let proxy_address_compute_tls: SocketAddr = args
.get_one::<String>("listen-tls")
.expect("listen-tls argument defined")
.parse()?;
info!("Starting sni router on {proxy_address}");
info!("Starting sni router on {proxy_address_compute_tls}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let proxy_listener_compute_tls = TcpListener::bind(proxy_address_compute_tls).await?;
let cancellation_token = CancellationToken::new();
let dest = Arc::new(destination);
let main = tokio::spawn(task_main(
dest.clone(),
tls_config.clone(),
None,
Arc::new(destination),
tls_config,
tls_server_end_point,
proxy_listener,
cancellation_token.clone(),
))
.map(crate::error::flatten_err);
let main_tls = tokio::spawn(task_main(
dest,
tls_config,
Some(compute_tls_config),
tls_server_end_point,
proxy_listener_compute_tls,
cancellation_token.clone(),
))
.map(crate::error::flatten_err);
));
let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {}));
// the signal task cant ever succeed.
// the main task can error, or can succeed on cancellation.
// we want to immediately exit on either of these cases
let main = futures::future::try_join(main, main_tls);
let signal = match futures::future::select(signals_task, main).await {
Either::Left((res, _)) => crate::error::flatten_err(res)?,
Either::Right((res, _)) => {
res?;
return Ok(());
}
Either::Right((res, _)) => return crate::error::flatten_err(res),
};
// maintenance tasks return `Infallible` success values, this is an impossible value
@@ -191,7 +157,6 @@ pub async fn run() -> anyhow::Result<()> {
async fn task_main(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
compute_tls_config: Option<Arc<rustls::ClientConfig>>,
tls_server_end_point: TlsServerEndPoint,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
@@ -210,7 +175,6 @@ async fn task_main(
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
let compute_tls_config = compute_tls_config.clone();
connections.spawn(
async move {
@@ -228,15 +192,7 @@ async fn task_main(
crate::metrics::Protocol::SniRouter,
"sni",
);
handle_client(
ctx,
dest_suffix,
tls_config,
compute_tls_config,
tls_server_end_point,
socket,
)
.await
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
@@ -312,7 +268,6 @@ async fn handle_client(
ctx: RequestContext,
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
compute_tls_config: Option<Arc<rustls::ClientConfig>>,
tls_server_end_point: TlsServerEndPoint,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
@@ -333,33 +288,7 @@ async fn handle_client(
info!("destination: {}", destination);
let mut client = tokio::net::TcpStream::connect(&destination).await?;
let client = if let Some(compute_tls_config) = compute_tls_config {
info!("upgrading TLS");
// send SslRequest
client
.write_all(b"\x00\x00\x00\x08\x04\xd2\x16\x2f")
.await?;
// wait for S/N respons
let mut resp = b'N';
client.read_exact(std::slice::from_mut(&mut resp)).await?;
// error if not S
ensure!(resp == b'S', "compute refused TLS");
// upgrade to TLS.
let domain = DnsName::try_from(destination)?;
let domain = rustls::pki_types::ServerName::DnsName(domain);
let client = TlsConnector::from(compute_tls_config)
.connect(domain, client)
.await?;
Connection::Tls(client)
} else {
Connection::Raw(client)
};
let mut client = tokio::net::TcpStream::connect(destination).await?;
// doesn't yet matter as pg-sni-router doesn't report analytics logs
ctx.set_success();
@@ -368,19 +297,9 @@ async fn handle_client(
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let res = match client {
Connection::Raw(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
Connection::Tls(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
};
match res {
match copy_bidirectional_client_compute(&mut tls_stream, &mut client).await {
Ok(_) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
}
}
enum Connection {
Raw(tokio::net::TcpStream),
Tls(tokio_rustls::client::TlsStream<tokio::net::TcpStream>),
}

View File

@@ -1,6 +1,7 @@
//
// Main entry point for the safekeeper executable
//
use std::env::{VarError, var};
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
@@ -353,13 +354,29 @@ async fn main() -> anyhow::Result<()> {
};
// Load JWT auth token to connect to other safekeepers for pull_timeline.
let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!("loading JWT token for authentication with safekeepers from {auth_token_path}");
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
Some(SecretString::from(auth_token.trim().to_owned()))
} else {
info!("no JWT token for authentication with safekeepers detected");
None
// First check if the env var is present, then check the arg with the path.
// We want to deprecate and remove the env var method in the future.
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
Ok(v) => {
info!("loaded JWT token for authentication with safekeepers");
Some(SecretString::from(v))
}
Err(VarError::NotPresent) => {
if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!(
"loading JWT token for authentication with safekeepers from {auth_token_path}"
);
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
Some(SecretString::from(auth_token.trim().to_owned()))
} else {
info!("no JWT token for authentication with safekeepers detected");
None
}
}
Err(_) => {
warn!("JWT token for authentication with safekeepers is not unicode");
None
}
};
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {

View File

@@ -3720,10 +3720,6 @@ impl Service {
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
// that will get the first creation request, and propagate the LSN to all the >0 shards.
//
// This also enables non-zero shards to use the initdb that shard 0 generated and uploaded to S3, rather than
// independently generating their own initdb. This guarantees that shards cannot end up with different initial
// states if e.g. they have different postgres binary versions.
let timeline_info = create_one(
shard_zero_tid,
shard_zero_locations,
@@ -3733,16 +3729,11 @@ impl Service {
)
.await?;
// Update the create request for shards >= 0
// Propagate the LSN that shard zero picked, if caller didn't provide one
match &mut create_req.mode {
models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => {
// Propagate the LSN that shard zero picked, if caller didn't provide one
*ancestor_start_lsn = timeline_info.ancestor_lsn;
},
models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } => {
// For shards >= 0, do not run initdb: use the one that shard 0 uploaded to S3
*existing_initdb_timeline_id = Some(create_req.new_timeline_id)
}
_ => {}
}

View File

@@ -1194,7 +1194,8 @@ class NeonEnv:
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_hooks_api is not None:
cfg["control_plane_hooks_api"] = self.control_plane_hooks_api

View File

@@ -14,7 +14,7 @@ from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.paths import BASE_DIR
from fixtures.pg_config import PgConfigKey
from fixtures.utils import WITH_SANITIZERS, subprocess_capture
from fixtures.utils import subprocess_capture
from werkzeug.wrappers.response import Response
if TYPE_CHECKING:
@@ -148,15 +148,6 @@ def test_remote_extensions(
pg_config: PgConfig,
extension: RemoteExtension,
):
if WITH_SANITIZERS and extension is RemoteExtension.WITH_LIB:
pytest.skip(
"""
For this test to work with sanitizers enabled, we would need to
compile the dummy Postgres extension with the same CFLAGS that we
compile Postgres and the neon extension with to link the sanitizers.
"""
)
# Setup a mock nginx S3 gateway which will return our test extension.
(host, port) = httpserver_listen_address
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"

View File

@@ -1,147 +0,0 @@
import random
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
def check_pinned_entries(cur):
# some LFC buffer can be temporary locked by autovacuum or background writer
for _ in range(10):
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'")
n_pinned = cur.fetchall()[0][0]
if n_pinned == 0:
break
time.sleep(1)
assert n_pinned == 0
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
assert lfc_used_pages > 10000
assert (
prewarm_info[0] > 0
and prewarm_info[1] > 0
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(cur)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
running = True
def workload():
conn = endpoint.connect()
cur = conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
workload_threads = []
for _ in range(n_threads):
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
time.sleep(20)
running = False
for t in workload_threads:
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(cur)

View File

@@ -3,7 +3,7 @@
Tests in this module exercise the pageserver's behavior around generation numbers,
as defined in docs/rfcs/025-generation-numbers.md. Briefly, the behaviors we require
of the pageserver are:
- Do not start a tenant without a generation number
- Do not start a tenant without a generation number if control_plane_api is set
- Remote objects must be suffixed with generation
- Deletions may only be executed after validating generation
- Updates to remote_consistent_lsn may only be made visible after validating generation

View File

@@ -511,4 +511,5 @@ PER_METRIC_VERIFIERS = {
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
"timeline_changed_bytes_from_parent": CannotVerifyAnything,
}