Compare commits

..

7 Commits

Author SHA1 Message Date
Christian Schwarz
98d0bca8d1 Merge branch 'problame/repartition-bail-on-concurrent-call' into problame/avoid-count-deltas-if-no-changes 2024-02-21 18:05:29 +00:00
Christian Schwarz
bafa6d37d9 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 18:05:28 +00:00
Christian Schwarz
ad23c945df WIP 2024-02-21 18:04:04 +00:00
Christian Schwarz
4536caa253 LayerMap: keep track of rebuilds 2024-02-21 17:02:39 +00:00
Christian Schwarz
d7920a2a57 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 16:01:29 +00:00
Christian Schwarz
f990e07581 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 15:56:07 +00:00
Christian Schwarz
1af2f3caf1 Timeline::repartition: enforce no concurrent callers & lsn to not move backwards
This PR enforces aspects of `Timeline::repartition` that were already
true at runtime:

- it's not called concurrently, so, bail out if it is anyway (see
  comment why it's not called concurrently)
- the `lsn` should never be moving backwards over the lifetime of a
  Timeline object, because last_record_lsn() can only move forwards
  over the lifetime of a Timeline object

part of #6861
2024-02-21 15:52:28 +00:00
94 changed files with 1431 additions and 2013 deletions

View File

@@ -39,7 +39,7 @@ runs:
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || [ "${GITHUB_REF_NAME}" = "release-proxy" ]; then
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for special branches
BRANCH_OR_PR=${GITHUB_REF_NAME}
else

View File

@@ -19,7 +19,7 @@ runs:
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || [ "${GITHUB_REF_NAME}" = "release-proxy" ]; then
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for special branches
BRANCH_OR_PR=${GITHUB_REF_NAME}
else

View File

@@ -16,14 +16,8 @@ concurrency:
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
actionlint:
needs: [ check-permissions ]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

View File

@@ -5,7 +5,6 @@ on:
branches:
- main
- release
- release-proxy
pull_request:
defaults:
@@ -28,9 +27,24 @@ env:
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
runs-on: ubuntu-latest
steps:
- name: Disallow PRs from forks
if: |
github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository
run: |
if [ "${{ contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.pull_request.author_association) }}" = "true" ]; then
MESSAGE="Please create a PR from a branch of ${GITHUB_REPOSITORY} instead of a fork"
else
MESSAGE="The PR should be reviewed and labelled with 'approved-for-ci-run' to trigger a CI run"
fi
echo >&2 "We don't run CI for PRs from forks"
echo >&2 "${MESSAGE}"
exit 1
cancel-previous-e2e-tests:
needs: [ check-permissions ]
@@ -68,8 +82,6 @@ jobs:
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
@@ -685,7 +697,7 @@ jobs:
})
trigger-e2e-tests:
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' }}
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' }}
needs: [ check-permissions, promote-images, tag ]
uses: ./.github/workflows/trigger-e2e-tests.yml
secrets: inherit
@@ -693,173 +705,158 @@ jobs:
neon-image:
needs: [ check-permissions, build-buildtools-image, tag ]
runs-on: [ self-hosted, gen3, large ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v1 # v3 won't work with kaniko
with:
submodules: true
fetch-depth: 0
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
- name: Configure ECR and Docker Hub login
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- uses: docker/login-action@v3
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Kaniko build neon
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{ needs.tag.outputs.build-tag }}
--build-arg TAG=${{ needs.build-buildtools-image.outputs.build-tools-tag }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
- uses: docker/build-push-action@v5
with:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-buildtools-image.outputs.build-tools-tag }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
provenance: false
push: true
pull: true
file: Dockerfile
cache-from: type=registry,ref=neondatabase/neon:cache
cache-to: type=registry,ref=neondatabase/neon:cache,mode=max
tags: |
369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
neondatabase/neon:${{needs.tag.outputs.build-tag}}
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ check-permissions, build-buildtools-image, tag ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 0
uses: actions/checkout@v1 # v3 won't work with kaniko
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
- name: Configure ECR and Docker Hub login
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- uses: docker/login-action@v3
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Kaniko build compute tools
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- uses: docker/build-push-action@v5
with:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{needs.tag.outputs.build-tag}}
TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
provenance: false
push: true
pull: true
file: Dockerfile.compute-tools
cache-from: type=registry,ref=neondatabase/compute-tools:cache
cache-to: type=registry,ref=neondatabase/compute-tools:cache,mode=max
tags: |
369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
compute-node-image:
needs: [ check-permissions, build-buildtools-image, tag ]
runs-on: [ self-hosted, gen3, large ]
container:
image: gcr.io/kaniko-project/executor:v1.9.2-debug
# Workaround for "Resolving download.osgeo.org (download.osgeo.org)... failed: Temporary failure in name resolution.""
# Should be prevented by https://github.com/neondatabase/neon/issues/4281
options: --add-host=download.osgeo.org:140.211.15.30
strategy:
fail-fast: false
matrix:
version: [ v14, v15, v16 ]
defaults:
run:
shell: sh -eu {0}
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v1 # v3 won't work with kaniko
with:
submodules: true
fetch-depth: 0
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
- name: Configure ECR and Docker Hub login
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
with:
# Disable parallelism for docker buildkit.
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
config-inline: |
[worker.oci]
max-parallelism = 1
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- uses: docker/login-action@v3
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Kaniko build compute node with extensions
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--cleanup
- uses: docker/build-push-action@v5
with:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version }}
BUILD_TAG=${{needs.tag.outputs.build-tag}}
TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
provenance: false
push: true
pull: true
file: Dockerfile.compute-node
cache-from: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache,mode=max
tags: |
369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
vm-compute-node-image:
needs: [ check-permissions, tag, compute-node-image ]
@@ -970,7 +967,9 @@ jobs:
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v16:${{needs.tag.outputs.build-tag}} vm-compute-node-v16
- name: Add latest tag to images
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy'
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
@@ -982,7 +981,9 @@ jobs:
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v16:${{needs.tag.outputs.build-tag}} latest
- name: Push images to production ECR
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
@@ -1006,7 +1007,9 @@ jobs:
crane push vm-compute-node-v16 neondatabase/vm-compute-node-v16:${{needs.tag.outputs.build-tag}}
- name: Push latest tags to Docker Hub
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
@@ -1096,7 +1099,7 @@ jobs:
deploy:
needs: [ check-permissions, promote-images, tag, regress-tests, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
@@ -1131,28 +1134,14 @@ jobs:
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f deployStorage=false \
-f deployStorageBroker=false \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
- name: Create git tag
if: github.ref_name == 'release' || github.ref_name == 'release-proxy'
if: github.ref_name == 'release'
uses: actions/github-script@v7
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
@@ -1165,7 +1154,6 @@ jobs:
sha: context.sha,
})
# TODO: check how GitHub releases looks for proxy releases and enable it if it's ok
- name: Create GitHub release
if: github.ref_name == 'release'
uses: actions/github-script@v7

View File

@@ -1,36 +0,0 @@
name: Check Permissions
on:
workflow_call:
inputs:
github-event-name:
required: true
type: string
defaults:
run:
shell: bash -euo pipefail {0}
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
jobs:
check-permissions:
runs-on: ubuntu-latest
steps:
- name: Disallow CI runs on PRs from forks
if: |
inputs.github-event-name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository
run: |
if [ "${{ contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.pull_request.author_association) }}" = "true" ]; then
MESSAGE="Please create a PR from a branch of ${GITHUB_REPOSITORY} instead of a fork"
else
MESSAGE="The PR should be reviewed and labelled with 'approved-for-ci-run' to trigger a CI run"
fi
# TODO: use actions/github-script to post this message as a PR comment
echo >&2 "We don't run CI for PRs from forks"
echo >&2 "${MESSAGE}"
exit 1

View File

@@ -1,32 +0,0 @@
# A workflow from
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries
name: cleanup caches by a branch
on:
pull_request:
types:
- closed
jobs:
cleanup:
runs-on: ubuntu-latest
steps:
- name: Cleanup
run: |
gh extension install actions/gh-actions-cache
echo "Fetching list of cache key"
cacheKeysForPR=$(gh actions-cache list -R $REPO -B $BRANCH -L 100 | cut -f 1 )
## Setting this to not fail the workflow while deleting cache keys.
set +e
echo "Deleting caches..."
for cacheKey in $cacheKeysForPR
do
gh actions-cache delete $cacheKey -R $REPO -B $BRANCH --confirm
done
echo "Done"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO: ${{ github.repository }}
BRANCH: refs/pull/${{ github.event.pull_request.number }}/merge

View File

@@ -20,14 +20,7 @@ env:
COPT: '-Werror'
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
check-macos-build:
needs: [ check-permissions ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
@@ -123,8 +116,8 @@ jobs:
run: ./run_clippy.sh
check-linux-arm-build:
needs: [ check-permissions ]
timeout-minutes: 90
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: [ self-hosted, dev, arm64 ]
env:
@@ -244,8 +237,8 @@ jobs:
cargo nextest run --package remote_storage --test test_real_azure
check-codestyle-rust-arm:
needs: [ check-permissions ]
timeout-minutes: 90
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: [ self-hosted, dev, arm64 ]
container:
@@ -316,7 +309,6 @@ jobs:
run: cargo deny check
gather-rust-build-stats:
needs: [ check-permissions ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||

View File

@@ -2,31 +2,12 @@ name: Create Release Branch
on:
schedule:
# It should be kept in sync with if-condition in jobs
- cron: '0 6 * * MON' # Storage release
- cron: '0 6 * * THU' # Proxy release
- cron: '0 6 * * 1'
workflow_dispatch:
inputs:
create-storage-release-branch:
type: boolean
description: 'Create Storage release PR'
required: false
create-proxy-release-branch:
type: boolean
description: 'Create Proxy release PR'
required: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
defaults:
run:
shell: bash -euo pipefail {0}
jobs:
create-storage-release-branch:
if: ${{ github.event.schedule == '0 6 * * MON' || format('{0}', inputs.create-storage-release-branch) == 'true' }}
runs-on: ubuntu-latest
create_release_branch:
runs-on: [ ubuntu-latest ]
permissions:
contents: write # for `git push`
@@ -37,67 +18,27 @@ jobs:
with:
ref: main
- name: Set environment variables
run: |
echo "RELEASE_DATE=$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
echo "RELEASE_BRANCH=rc/$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
- name: Get current date
id: date
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
- name: Create release branch
run: git checkout -b $RELEASE_BRANCH
run: git checkout -b releases/${{ steps.date.outputs.date }}
- name: Push new branch
run: git push origin $RELEASE_BRANCH
run: git push origin releases/${{ steps.date.outputs.date }}
- name: Create pull request into release
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
cat << EOF > body.md
## Release ${RELEASE_DATE}
## Release ${{ steps.date.outputs.date }}
**Please merge this Pull Request using 'Create a merge commit' button**
**Please merge this PR using 'Create a merge commit'!**
EOF
gh pr create --title "Release ${RELEASE_DATE}" \
gh pr create --title "Release ${{ steps.date.outputs.date }}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--head "releases/${{ steps.date.outputs.date }}" \
--base "release"
create-proxy-release-branch:
if: ${{ github.event.schedule == '0 6 * * THU' || format('{0}', inputs.create-proxy-release-branch) == 'true' }}
runs-on: ubuntu-latest
permissions:
contents: write # for `git push`
steps:
- name: Check out code
uses: actions/checkout@v4
with:
ref: main
- name: Set environment variables
run: |
echo "RELEASE_DATE=$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
echo "RELEASE_BRANCH=rc/proxy/$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
- name: Create release branch
run: git checkout -b $RELEASE_BRANCH
- name: Push new branch
run: git push origin $RELEASE_BRANCH
- name: Create pull request into release
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
cat << EOF > body.md
## Proxy release ${RELEASE_DATE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "Proxy release ${RELEASE_DATE}}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--base "release-proxy"

View File

@@ -51,8 +51,6 @@ jobs:
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')

1
Cargo.lock generated
View File

@@ -3552,7 +3552,6 @@ dependencies = [
"const_format",
"enum-map",
"hex",
"humantime",
"humantime-serde",
"itertools",
"postgres_ffi",

View File

@@ -786,22 +786,6 @@ RUN wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_iv
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_ivm.control
#########################################################################################
#
# Layer "pg_partman"
# compile pg_partman extension
#
#########################################################################################
FROM build-deps AS pg-partman-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.0.1.tar.gz -O pg_partman.tar.gz && \
echo "75b541733a9659a6c90dbd40fccb904a630a32880a6e3044d0c4c5f4c8a65525 pg_partman.tar.gz" | sha256sum --check && \
mkdir pg_partman-src && cd pg_partman-src && tar xvzf ../pg_partman.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_partman.control
#########################################################################################
#
@@ -845,7 +829,6 @@ COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -5,7 +5,7 @@
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
## Quick start
Try the [Neon Free Tier](https://neon.tech) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
Try the [Neon Free Tier](https://neon.tech/docs/introduction/technical-preview-free-tier/) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
Alternatively, compile and run the project [locally](#running-local-installation).

View File

@@ -82,12 +82,6 @@ pub fn write_postgres_conf(
ComputeMode::Replica => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
// Inform the replica about the primary state
// Default is 'false'
if let Some(primary_is_running) = spec.primary_is_running {
writeln!(file, "neon.primary_is_running={}", primary_is_running)?;
}
}
}

View File

@@ -655,9 +655,6 @@ pub fn handle_grants(
// remove this code if possible. The worst thing that could happen is that
// user won't be able to use public schema in NEW databases created in the
// very OLD project.
//
// Also, alter default permissions so that relations created by extensions can be
// used by neon_superuser without permission issues.
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\
@@ -676,8 +673,6 @@ pub fn handle_grants(
GRANT CREATE ON SCHEMA public TO web_access;\n\
END IF;\n\
END IF;\n\
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
END\n\
$$;"
.to_string();
@@ -782,12 +777,6 @@ BEGIN
END
$$;"#,
"GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION",
// Don't remove: these are some SQLs that we originally applied in migrations but turned out to execute somewhere else.
"",
"",
"",
"",
// Add new migrations below.
];
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
@@ -814,13 +803,8 @@ $$;"#,
client.simple_query(query)?;
while current_migration < migrations.len() {
let migration = &migrations[current_migration];
if migration.is_empty() {
info!("Skip migration id={}", current_migration);
} else {
info!("Running migration:\n{}\n", migration);
client.simple_query(migration)?;
}
info!("Running migration:\n{}\n", migrations[current_migration]);
client.simple_query(migrations[current_migration])?;
current_migration += 1;
}
let setval = format!(

View File

@@ -66,7 +66,14 @@ fn get_state(request: &Request<Body>) -> &HttpState {
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
json_response(
StatusCode::OK,
state
.service
.re_attach(reattach_req)
.await
.map_err(ApiError::InternalServerError)?,
)
}
/// Pageserver calls into this before doing deletions, to confirm that it still
@@ -325,10 +332,7 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
}
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.node_configure(config_req).await?,
)
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
}
async fn handle_tenant_shard_split(

View File

@@ -10,7 +10,7 @@ use crate::persistence::NodePersistence;
///
/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, Eq, PartialEq)]
pub(crate) struct Node {
pub(crate) id: NodeId,

View File

@@ -6,7 +6,7 @@ use std::time::Duration;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::NodeSchedulingPolicy;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
@@ -130,10 +130,24 @@ impl Persistence {
}
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
let nodes: Vec<Node> = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
Ok(crate::schema::nodes::table
.load::<NodePersistence>(conn)?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<Node>>())
})
.await?;
@@ -142,31 +156,6 @@ impl Persistence {
Ok(nodes)
}
pub(crate) async fn update_node(
&self,
input_node_id: NodeId,
input_scheduling: NodeSchedulingPolicy,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
let updated = self
.with_conn(move |conn| {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
.execute(conn)?;
Ok(updated)
})
.await?;
if updated != 1 {
Err(DatabaseError::Logical(format!(
"Node {node_id:?} not found for update",
)))
} else {
Ok(())
}
}
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
@@ -517,7 +506,7 @@ pub(crate) struct TenantShardPersistence {
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
#[diesel(table_name = crate::schema::nodes)]
pub(crate) struct NodePersistence {
pub(crate) node_id: i64,

View File

@@ -438,7 +438,7 @@ impl Reconciler {
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(%node_id, "Observed configuration already correct.")
tracing::info!("Observed configuration already correct.")
}
_ => {
// In all cases other than a matching observed configuration, we will
@@ -449,7 +449,7 @@ impl Reconciler {
.increment_generation(self.tenant_shard_id, node_id)
.await?;
wanted_conf.generation = self.generation.into();
tracing::info!(%node_id, "Observed configuration requires update.");
tracing::info!("Observed configuration requires update.");
self.location_config(node_id, wanted_conf, None).await?;
self.compute_notify().await?;
}

View File

@@ -175,33 +175,6 @@ impl Scheduler {
}
}
/// Where we have several nodes to choose from, for example when picking a secondary location
/// to promote to an attached location, this method may be used to pick the best choice based
/// on the scheduler's knowledge of utilization and availability.
///
/// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
/// caller can pick a node some other way.
pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
if nodes.is_empty() {
return None;
}
let node = nodes
.iter()
.map(|node_id| {
let may_schedule = self
.nodes
.get(node_id)
.map(|n| n.may_schedule)
.unwrap_or(false);
(*node_id, may_schedule)
})
.max_by_key(|(_n, may_schedule)| *may_schedule);
// If even the preferred node has may_schedule==false, return None
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
@@ -251,45 +224,44 @@ impl Scheduler {
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use crate::node::Node;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
///
/// Node IDs start at one.
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
(1..n + 1)
.map(|i| {
(
NodeId(i),
Node {
id: NodeId(i),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: format!("httphost-{i}"),
listen_http_port: 80 + i as u16,
listen_pg_addr: format!("pghost-{i}"),
listen_pg_port: 5432 + i as u16,
},
)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use utils::id::NodeId;
use crate::tenant_state::IntentState;
use crate::{node::Node, tenant_state::IntentState};
#[test]
fn scheduler_basic() -> anyhow::Result<()> {
let nodes = test_utils::make_test_nodes(2);
let mut nodes = HashMap::new();
nodes.insert(
NodeId(1),
Node {
id: NodeId(1),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
nodes.insert(
NodeId(2),
Node {
id: NodeId(2),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
let mut scheduler = Scheduler::new(nodes.values());
let mut t1_intent = IntentState::new();

View File

@@ -56,11 +56,6 @@ use crate::{
PlacementPolicy, Sequence,
};
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
// For operations that might be slow, like migrating a tenant with
// some data in it.
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
@@ -484,8 +479,8 @@ impl Service {
async move {
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
tracing::error!(
%tenant_shard_id,
%node_id,
tenant_shard_id=%tenant_shard_id,
node_id=%node_id,
"Failed to notify compute on startup for shard: {e}"
);
None
@@ -622,22 +617,7 @@ impl Service {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let nodes = persistence
.list_nodes()
.await?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<_>>();
let nodes = persistence.list_nodes().await?;
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
@@ -929,16 +909,7 @@ impl Service {
pub(crate) async fn re_attach(
&self,
reattach_req: ReAttachRequest,
) -> Result<ReAttachResponse, ApiError> {
// Take a re-attach as indication that the node is available: this is a precursor to proper
// heartbeating in https://github.com/neondatabase/neon/issues/6844
self.node_configure(NodeConfigureRequest {
node_id: reattach_req.node_id,
availability: Some(NodeAvailability::Active),
scheduling: None,
})
.await?;
) -> anyhow::Result<ReAttachResponse> {
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
@@ -1029,16 +1000,6 @@ impl Service {
&self,
create_req: TenantCreateRequest,
) -> Result<TenantCreateResponse, ApiError> {
let (response, waiters) = self.do_tenant_create(create_req).await?;
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
Ok(response)
}
pub(crate) async fn do_tenant_create(
&self,
create_req: TenantCreateRequest,
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
// This service expects to handle sharding itself: it is an error to try and directly create
// a particular shard here.
let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
@@ -1188,12 +1149,11 @@ impl Service {
(waiters, response_shards)
};
Ok((
TenantCreateResponse {
shards: response_shards,
},
waiters,
))
self.await_waiters(waiters).await?;
Ok(TenantCreateResponse {
shards: response_shards,
})
}
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
@@ -1201,9 +1161,8 @@ impl Service {
async fn await_waiters(
&self,
waiters: Vec<ReconcilerWaiter>,
timeout: Duration,
) -> Result<(), ReconcileWaitError> {
let deadline = Instant::now().checked_add(timeout).unwrap();
let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
for waiter in waiters {
let timeout = deadline.duration_since(Instant::now());
waiter.wait_timeout(timeout).await?;
@@ -1341,8 +1300,12 @@ impl Service {
}
};
let waiters = if let Some(create_req) = maybe_create {
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
// TODO: if we timeout/fail on reconcile, we should still succeed this request,
// because otherwise a broken compute hook causes a feedback loop where
// location_config returns 500 and gets retried forever.
if let Some(create_req) = maybe_create {
let create_resp = self.tenant_create(create_req).await?;
result.shards = create_resp
.shards
.into_iter()
@@ -1351,25 +1314,19 @@ impl Service {
shard_id: s.shard_id,
})
.collect();
waiters
} else {
waiters
};
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
// This was an update, wait for reconciliation
if let Err(e) = self.await_waiters(waiters).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
}
}
// Logging the full result is useful because it lets us cross-check what the cloud control
// plane's tenant_shards table should contain.
tracing::info!("Complete, returning {result:?}");
Ok(result)
}
@@ -2342,11 +2299,7 @@ impl Service {
.context("Scheduler checks")
.map_err(ApiError::InternalServerError)?;
let expect_nodes = locked
.nodes
.values()
.map(|n| n.to_persistent())
.collect::<Vec<_>>();
let expect_nodes = locked.nodes.values().cloned().collect::<Vec<_>>();
let expect_shards = locked
.tenants
@@ -2358,8 +2311,8 @@ impl Service {
};
let mut nodes = self.persistence.list_nodes().await?;
expect_nodes.sort_by_key(|n| n.node_id);
nodes.sort_by_key(|n| n.node_id);
expect_nodes.sort_by_key(|n| n.id);
nodes.sort_by_key(|n| n.id);
if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
@@ -2373,9 +2326,6 @@ impl Service {
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Node consistency failure"
)));
}
let mut shards = self.persistence.list_tenant_shards().await?;
@@ -2386,17 +2336,14 @@ impl Service {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
serde_json::to_string(&expect_shards)
serde_json::to_string(&expect_nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&shards)
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard consistency failure"
)));
}
Ok(())
@@ -2522,18 +2469,7 @@ impl Service {
Ok(())
}
pub(crate) async fn node_configure(
&self,
config_req: NodeConfigureRequest,
) -> Result<(), ApiError> {
if let Some(scheduling) = config_req.scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
self.persistence
.update_node(config_req.node_id, scheduling)
.await?;
}
pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();

View File

@@ -143,23 +143,6 @@ impl IntentState {
}
}
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);
self.attached = Some(promote_secondary);
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
@@ -214,8 +197,6 @@ impl IntentState {
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
true
@@ -389,9 +370,6 @@ impl TenantState {
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
//
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
@@ -399,33 +377,6 @@ impl TenantState {
});
}
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
/// attached pageserver for a shard.
///
/// Returns whether we modified it, and the NodeId selected.
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
return Ok((false, node_id));
}
if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
// Promote a secondary
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
self.intent.promote_attached(scheduler, promote_secondary);
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
@@ -436,15 +387,19 @@ impl TenantState {
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut used_pageservers = self.intent.all_pageservers();
let mut modified = false;
use PlacementPolicy::*;
match self.policy {
Single => {
// Should have exactly one attached, and zero secondaries
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
modified = true;
@@ -452,10 +407,13 @@ impl TenantState {
}
Double(secondary_count) => {
// Should have exactly one attached, and N secondaries
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.push_secondary(scheduler, node_id);
@@ -737,95 +695,10 @@ impl TenantState {
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
generation_pageserver: self
.intent
.get_attached()
.map(|n| n.0 as i64)
.unwrap_or(i64::MAX),
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use pageserver_api::shard::{ShardCount, ShardNumber};
use utils::id::TenantId;
use crate::scheduler::test_utils::make_test_nodes;
use super::*;
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
let tenant_id = TenantId::generate();
let shard_number = ShardNumber(0);
let shard_count = ShardCount::new(1);
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantState::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
policy,
)
}
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
fn tenant_ha_scheduling() -> anyhow::Result<()> {
// Start with three nodes. Our tenant will only use two. The third one is
// expected to remain unused.
let mut nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
tenant_state
.schedule(&mut scheduler)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
assert_eq!(tenant_state.intent.secondary.len(), 1);
assert!(tenant_state.intent.attached.is_some());
let attached_node_id = tenant_state.intent.attached.unwrap();
let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_state.intent.notify_offline(attached_node_id);
assert!(changed);
// Update the scheduler state to indicate the node is offline
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
// Scheduling the node should promote the still-available secondary node to attached
tenant_state
.schedule(&mut scheduler)
.expect("active nodes are available");
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
// The original attached node should have been retained as a secondary
assert_eq!(
*tenant_state.intent.secondary.iter().last().unwrap(),
attached_node_id
);
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
}

View File

@@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
let tenant_id = get_tenant_id(create_match, env)?;
let new_branch_name = create_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
.ok_or_else(|| anyhow!("No branch name provided"))?; // TODO
let pg_version = create_match
.get_one::<u32>("pg-version")

View File

@@ -590,7 +590,6 @@ impl Endpoint {
remote_extensions,
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
primary_is_running: None,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -210,25 +210,6 @@ impl PageServerNode {
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
// successfully call /re-attach and finish starting up.
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -267,6 +248,23 @@ impl PageServerNode {
)
.await?;
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
Ok(())
}
@@ -391,6 +389,11 @@ impl PageServerNode {
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
gc_feedback: settings
.remove("gc_feedback")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
lazy_slru_download: settings
.remove("lazy_slru_download")
@@ -496,6 +499,11 @@ impl PageServerNode {
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
gc_feedback: settings
.remove("gc_feedback")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
lazy_slru_download: settings
.remove("lazy_slru_download")

View File

@@ -79,12 +79,6 @@ pub struct ComputeSpec {
// Stripe size for pageserver sharding, in pages
#[serde(default)]
pub shard_stripe_size: Option<usize>,
// When we are starting a new replica in hot standby mode,
// we need to know if the primary is running.
// This is used to determine if replica should wait for
// RUNNING_XACTS from primary or not.
pub primary_is_running: Option<bool>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -18,7 +18,6 @@ enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
humantime.workspace = true
thiserror.workspace = true
humantime-serde.workspace = true
chrono.workspace = true

View File

@@ -1,7 +1,4 @@
pub mod partitioning;
pub mod utilization;
pub use utilization::PageserverUtilization;
use std::{
collections::HashMap,
@@ -283,6 +280,7 @@ pub struct TenantConfig {
pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
@@ -339,7 +337,7 @@ impl ThrottleConfig {
}
/// The requests per second allowed by the given config.
pub fn steady_rps(&self) -> f64 {
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64()) / 1e3
}
}

View File

@@ -1,70 +0,0 @@
use std::time::SystemTime;
/// Pageserver current utilization and scoring for how good candidate the pageserver would be for
/// the next tenant.
///
/// See and maintain pageserver openapi spec for `/v1/utilization_score` as the truth.
///
/// `format: int64` fields must use `ser_saturating_u63` because openapi generated clients might
/// not handle full u64 values properly.
#[derive(serde::Serialize, Debug)]
pub struct PageserverUtilization {
/// Used disk space
#[serde(serialize_with = "ser_saturating_u63")]
pub disk_usage_bytes: u64,
/// Free disk space
#[serde(serialize_with = "ser_saturating_u63")]
pub free_space_bytes: u64,
/// Lower is better score for how good candidate for a next tenant would this pageserver be.
#[serde(serialize_with = "ser_saturating_u63")]
pub utilization_score: u64,
/// When was this snapshot captured, pageserver local time.
///
/// Use millis to give confidence that the value is regenerated often enough.
#[serde(serialize_with = "ser_rfc3339_millis")]
pub captured_at: SystemTime,
}
fn ser_rfc3339_millis<S: serde::Serializer>(
ts: &SystemTime,
serializer: S,
) -> Result<S::Ok, S::Error> {
serializer.collect_str(&humantime::format_rfc3339_millis(*ts))
}
/// openapi knows only `format: int64`, so avoid outputting a non-parseable value by generated clients.
///
/// Instead of newtype, use this because a newtype would get require handling deserializing values
/// with the highest bit set which is properly parsed by serde formats, but would create a
/// conundrum on how to handle and again serialize such values at type level. It will be a few
/// years until we can use more than `i64::MAX` bytes on a disk.
fn ser_saturating_u63<S: serde::Serializer>(value: &u64, serializer: S) -> Result<S::Ok, S::Error> {
const MAX_FORMAT_INT64: u64 = i64::MAX as u64;
let value = (*value).min(MAX_FORMAT_INT64);
serializer.serialize_u64(value)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn u64_max_is_serialized_as_u63_max() {
let doc = PageserverUtilization {
disk_usage_bytes: u64::MAX,
free_space_bytes: 0,
utilization_score: u64::MAX,
captured_at: SystemTime::UNIX_EPOCH + Duration::from_secs(1708509779),
};
let s = serde_json::to_string(&doc).unwrap();
let expected = r#"{"disk_usage_bytes":9223372036854775807,"free_space_bytes":0,"utilization_score":9223372036854775807,"captured_at":"2024-02-21T10:02:59.000Z"}"#;
assert_eq!(s, expected);
}
}

View File

@@ -80,9 +80,6 @@ pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
// From standbydefs.h
pub const XLOG_RUNNING_XACTS: u8 = 0x10;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize;

View File

@@ -119,6 +119,11 @@ pub fn generate_pg_control(
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;

View File

@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
io,
};
use camino::{Utf8Path, Utf8PathBuf};
@@ -161,48 +161,6 @@ pub async fn durable_rename(
Ok(())
}
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
///
/// The file is first written to the specified `tmp_path`, and in a second
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
/// and atomic rename guarantee that, if we crash at any point, there will never
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
///
/// Callers are responsible for serializing calls of this function for a given `final_path`.
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
/// I.e., the atomticity guarantees still hold.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // don't keep the fd open for longer than we have to
std::fs::rename(tmp_path, final_path)?;
let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;
final_parent_dirfd.sync_all()?;
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -39,7 +39,7 @@ use crate::tenant::{
};
use crate::virtual_file;
use crate::{
IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
};
@@ -140,6 +140,7 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
#secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
@@ -825,6 +826,17 @@ impl PageServerConf {
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Utf8PathBuf {
self.timeline_path(tenant_shard_id, timeline_id)
.join(METADATA_FILE_NAME)
}
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
remote_path.with_base(&self.workdir)

View File

@@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
@@ -325,8 +325,7 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)

View File

@@ -567,6 +567,114 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
post:
description: |
Schedules attach operation to happen in the background for the given tenant.
As soon as the caller sends this request, it must assume the pageserver
starts writing to the tenant's S3 state unless it receives one of the
distinguished errors below that state otherwise.
If a client receives a not-distinguished response, e.g., a network timeout,
it MUST retry the /attach request and poll again for the tenant's
attachment status.
After the client has received a 202, it MUST poll the tenant's
attachment status (field `attachment_status`) to reach state `attached`.
If the `attachment_status` is missing, the client MUST retry the `/attach`
request (goto previous paragraph). This is a robustness measure in case the tenant
status endpoint is buggy, but the attach operation is ongoing.
There is no way to cancel an in-flight request.
In any case, the client
* MUST NOT ASSUME that the /attach request has been lost in the network,
* MUST NOT ASSUME that the request has been lost, based on the observation
that a subsequent tenant status request returns 404. The request may
still be in flight. It must be retried.
The client SHOULD supply a `TenantConfig` for the tenant in the request body.
Settings specified in the config override the pageserver's defaults.
It is guaranteed that the config settings are applied before the pageserver
starts operating on the tenant. E.g., if the config specifies a specific
PITR interval for a tenant, then that setting will be in effect before the
pageserver starts the garbage collection loop. This enables a client to
guarantee a specific PITR setting across detach/attach cycles.
The pageserver will reject the request if it cannot parse the config, or
if there are any unknown fields in it.
If the client does not supply a config, the pageserver will use its defaults.
This behavior is deprecated: https://github.com/neondatabase/neon/issues/4282
requestBody:
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/TenantAttachRequest"
responses:
"202":
description: Tenant attaching scheduled
"400":
description: Bad Request
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Timeline not found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"409":
description: |
The tenant is already known to Pageserver in some way,
and hence this `/attach` call has been rejected.
Some examples of how this can happen:
- tenant was created on this pageserver
- tenant attachment was started by an earlier call to `/attach`.
Callers should poll the tenant status's `attachment_status` field,
like for status 202. See the longer description for `POST /attach`
for details.
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/location_config:
parameters:
- name: tenant_id
@@ -662,6 +770,66 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/detach:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: detach_ignored
in: query
required: false
schema:
type: boolean
description: |
When true, allow to detach a tenant which state is ignored.
post:
description: |
Remove tenant data (including all corresponding timelines) from pageserver's memory and file system.
Files on the remote storage are not affected.
responses:
"200":
description: Tenant detached
"400":
description: Error when no tenant id found in path parameters
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Tenant not found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/ignore:
parameters:
- name: tenant_id
@@ -1211,25 +1379,6 @@ paths:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/utilization:
get:
description: |
Returns the pageservers current utilization and fitness score for new tenants.
responses:
"200":
description: Pageserver utilization and fitness score
content:
application/json:
schema:
$ref: "#/components/schemas/PageserverUtilization"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
securitySchemes:
JWT:
@@ -1296,6 +1445,16 @@ components:
generation:
type: integer
description: Attachment generation number.
TenantAttachRequest:
type: object
required:
- config
properties:
config:
$ref: '#/components/schemas/TenantConfig'
generation:
type: integer
description: Attachment generation number.
TenantConfigRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'
@@ -1532,33 +1691,6 @@ components:
type: string
enum: [past, present, future, nodata]
PageserverUtilization:
type: object
required:
- disk_usage_bytes
- free_space_bytes
- utilization_score
properties:
disk_usage_bytes:
type: integer
format: int64
minimum: 0
description: The amount of disk space currently utilized by layer files.
free_space_bytes:
type: integer
format: int64
minimum: 0
description: The amount of usable disk space left.
utilization_score:
type: integer
format: int64
minimum: 0
maximum: 9223372036854775807
default: 9223372036854775807
description: |
Lower is better score for how good this pageserver would be for the next tenant.
The default or maximum value can be returned in situations when a proper score cannot (yet) be calculated.
Error:
type: object
required:

View File

@@ -100,7 +100,6 @@ pub struct State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
}
impl State {
@@ -129,7 +128,6 @@ impl State {
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
latest_utilization: Default::default(),
})
}
}
@@ -1965,54 +1963,6 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
async fn get_utilization(
r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// this probably could be completely public, but lets make that change later.
check_permission(&r, None)?;
let state = get_state(&r);
let mut g = state.latest_utilization.lock().await;
let regenerate_every = Duration::from_secs(1);
let still_valid = g
.as_ref()
.is_some_and(|(captured_at, _)| captured_at.elapsed() < regenerate_every);
// avoid needless statvfs calls even though those should be non-blocking fast.
// regenerate at most 1Hz to allow polling at any rate.
if !still_valid {
let path = state.conf.tenants_path();
let doc = crate::utilization::regenerate(path.as_std_path())
.map_err(ApiError::InternalServerError)?;
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &doc)
.context("serialize")
.map_err(ApiError::InternalServerError)?;
let body = bytes::Bytes::from(buf);
*g = Some((std::time::Instant::now(), body));
}
// hyper 0.14 doesn't yet have Response::clone so this is a bit of extra legwork
let cached = g.as_ref().expect("just set").1.clone();
Response::builder()
.header(hyper::http::header::CONTENT_TYPE, "application/json")
// thought of using http date header, but that is second precision which does not give any
// debugging aid
.status(StatusCode::OK)
.body(hyper::Body::from(cached))
.context("build response")
.map_err(ApiError::InternalServerError)
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2274,6 +2224,5 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.any(handler_404))
}

View File

@@ -22,7 +22,6 @@ pub(crate) mod statvfs;
pub mod task_mgr;
pub mod tenant;
pub mod trace;
pub mod utilization;
pub mod virtual_file;
pub mod walingest;
pub mod walrecord;
@@ -169,6 +168,15 @@ pub fn is_delete_mark(path: &Utf8Path) -> bool {
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
}
fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
if let Some(e) = e.io_error() {
if e.kind() == std::io::ErrorKind::NotFound {
return true;
}
}
false
}
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
/// blocking.
///

View File

@@ -642,6 +642,26 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
});
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
// or in testing they estimate how much we would upload if we did.
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_created_persistent_files_total",
"Number of files created that are meant to be uploaded to cloud storage",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_written_persistent_bytes_total",
"Total bytes written that are meant to be uploaded to cloud storage",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_eviction_iteration_duration_seconds_global",
@@ -1782,6 +1802,8 @@ pub(crate) struct TimelineMetrics {
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
}
@@ -1863,6 +1885,12 @@ impl TimelineMetrics {
};
let directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>> =
Lazy::new(Box::new(directory_entries_count_gauge_closure));
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let persistent_bytes_written = PERSISTENT_BYTES_WRITTEN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let evictions = EVICTIONS
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -1884,6 +1912,8 @@ impl TimelineMetrics {
resident_physical_size_gauge,
current_logical_size_gauge,
directory_entries_count_gauge,
num_persistent_files_created,
persistent_bytes_written,
evictions,
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
@@ -1893,6 +1923,8 @@ impl TimelineMetrics {
pub(crate) fn record_new_file_metrics(&self, sz: u64) {
self.resident_physical_size_add(sz);
self.num_persistent_files_created.inc_by(1);
self.persistent_bytes_written.inc_by(sz);
}
pub(crate) fn resident_physical_size_sub(&self, sz: u64) {
@@ -1925,6 +1957,9 @@ impl Drop for TimelineMetrics {
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
}
let _ =
NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
self.evictions_with_low_residence_duration

View File

@@ -29,6 +29,7 @@ use remote_storage::TimeoutOrCancel;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -171,6 +172,9 @@ pub(crate) mod throttle;
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;
// re-export for use in walreceiver
pub use crate::tenant::timeline::WalReceiverInfo;
@@ -1147,6 +1151,17 @@ impl Tenant {
None
};
// timeline loading after attach expects to find metadata file for each metadata
save_metadata(
self.conf,
&self.tenant_shard_id,
&timeline_id,
&remote_metadata,
)
.await
.context("save_metadata")
.map_err(LoadLocalTimelineError::Load)?;
self.timeline_init_and_sync(
timeline_id,
resources,
@@ -2573,24 +2588,19 @@ impl Tenant {
legacy_config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
Self::persist_tenant_config_legacy(
tenant_shard_id,
legacy_config_path,
&location_conf.tenant_conf,
)
.await?;
if let LocationMode::Attached(attach_conf) = &location_conf.mode {
// The modern-style LocationConf config file requires a generation to be set. In case someone
// is running a pageserver without the infrastructure to set generations, write out the legacy-style
// config file that only contains TenantConf.
//
// This will eventually be removed in https://github.com/neondatabase/neon/issues/5388
// Once we use LocationMode, generations are mandatory. If we aren't using generations,
// then drop out after writing legacy-style config.
if attach_conf.generation.is_none() {
tracing::info!(
"Running without generations, writing legacy-style tenant config file"
);
Self::persist_tenant_config_legacy(
tenant_shard_id,
legacy_config_path,
&location_conf.tenant_conf,
)
.await?;
tracing::debug!("Running without generations, not writing new-style LocationConf");
return Ok(());
}
}
@@ -2613,10 +2623,17 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {config_path}")
})
})
})
.await??;
Ok(())
}
@@ -2643,12 +2660,17 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})?;
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
.await??;
Ok(())
}
@@ -3271,7 +3293,10 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) = self.create_timeline_files(&uninit_mark.timeline_path).await {
if let Err(e) = self
.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
.await
{
error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
return Err(e);
@@ -3288,13 +3313,26 @@ impl Tenant {
))
}
async fn create_timeline_files(&self, timeline_path: &Utf8Path) -> anyhow::Result<()> {
async fn create_timeline_files(
&self,
timeline_path: &Utf8Path,
new_timeline_id: &TimelineId,
new_metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
});
save_metadata(
self.conf,
&self.tenant_shard_id,
new_timeline_id,
new_metadata,
)
.await
.context("Failed to create timeline metadata")?;
Ok(())
}
@@ -3646,6 +3684,7 @@ pub(crate) mod harness {
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
gc_feedback: Some(tenant_conf.gc_feedback),
heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),

View File

@@ -339,6 +339,7 @@ pub struct TenantConf {
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
/// may be disabled if a Tenant will not have secondary locations: only secondary
@@ -426,6 +427,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
@@ -480,6 +485,7 @@ impl TenantConfOpt {
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
lazy_slru_download: self
.lazy_slru_download
@@ -524,6 +530,7 @@ impl Default for TenantConf {
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
@@ -596,6 +603,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
evictions_low_residence_duration_metric_threshold: value
.evictions_low_residence_duration_metric_threshold
.map(humantime),
gc_feedback: value.gc_feedback,
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),

View File

@@ -61,6 +61,8 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
pub(crate) use self::historic_layer_coverage::RebuildVersion;
use super::storage_layer::PersistentLayerDesc;
///
@@ -500,7 +502,7 @@ impl LayerMap {
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
pub(self) fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
let layer_key = layer_desc.key();
@@ -525,6 +527,10 @@ impl LayerMap {
self.historic.rebuild();
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.historic.get_rebuild_version()
}
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?

View File

@@ -413,6 +413,8 @@ fn test_persistent_overlapping() {
/// See this for more on persistent and retroactive techniques:
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
pub struct BufferedHistoricLayerCoverage<Value> {
rebuild_version: RebuildVersion,
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
@@ -438,9 +440,33 @@ impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RebuildVersion(u64);
impl RebuildVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
impl Default for RebuildVersion {
fn default() -> Self {
RebuildVersion(1)
}
}
impl std::fmt::Display for RebuildVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
rebuild_version: RebuildVersion::default(),
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
@@ -462,6 +488,8 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
None => return, // No need to rebuild if buffer is empty
};
self.rebuild_version.inc();
// Apply buffered updates to self.layers
let num_updates = self.buffer.len();
self.buffer.retain(|layer_key, layer| {
@@ -493,11 +521,17 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
// TODO maybe only warn if ratio is at least 10
info!(
version = %self.rebuild_version,
"Rebuilt layer map. Did {} insertions to process a batch of {} updates.",
num_inserted, num_updates,
num_inserted,
num_updates,
)
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.rebuild_version
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,

View File

@@ -8,11 +8,20 @@
//!
//! [`remote_timeline_client`]: super::remote_timeline_client
use anyhow::ensure;
use std::io::{self};
use anyhow::{ensure, Context};
use pageserver_api::shard::TenantShardId;
use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error;
use utils::bin_ser::SerializeError;
use utils::crashsafe::path_with_suffix_extension;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile;
use crate::TEMP_FILE_SUFFIX;
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
@@ -259,6 +268,32 @@ impl Serialize for TimelineMetadata {
}
}
/// Save timeline metadata to file
#[tracing::instrument(skip_all, fields(%tenant_id=tenant_shard_id.tenant_id, %shard_id=tenant_shard_id.shard_slug(), %timeline_id))]
pub async fn save_metadata(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
data: &TimelineMetadata,
) -> anyhow::Result<()> {
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())
}
#[derive(Error, Debug)]
pub enum LoadMetadataError {
#[error(transparent)]
Read(#[from] io::Error),
#[error(transparent)]
Decode(#[from] anyhow::Error),
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -42,7 +42,7 @@ use crate::tenant::config::{
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext::PathExt;
@@ -359,6 +359,12 @@ fn load_tenant_config(
return Ok(None);
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
return Ok(None);
}
let tenant_shard_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
@@ -371,59 +377,6 @@ fn load_tenant_config(
}
};
// Clean up legacy `metadata` files.
// Doing it here because every single tenant directory is visited here.
// In any later code, there's different treatment of tenant dirs
// ... depending on whether the tenant is in re-attach response or not
// ... epending on whether the tenant is ignored or not
assert_eq!(
&conf.tenant_path(&tenant_shard_id),
&tenant_dir_path,
"later use of conf....path() methods would be dubious"
);
let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
Ok(iter) => {
let mut timelines = Vec::new();
for res in iter {
let p = res?;
let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
// skip any entries that aren't TimelineId, such as
// - *.___temp dirs
// - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
continue;
};
timelines.push(timeline_id);
}
timelines
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
Err(e) => return Err(anyhow::anyhow!(e)),
};
for timeline_id in timelines {
let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
let metadata_path = timeline_path.join(METADATA_FILE_NAME);
match std::fs::remove_file(&metadata_path) {
Ok(()) => {
crashsafe::fsync(timeline_path)
.context("fsync timeline dir after removing legacy metadata file")?;
info!("removed legacy metadata file at {metadata_path}");
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// something removed the file earlier, or it was never there
// We don't care, this software version doesn't write it again, so, we're good.
}
Err(e) => {
anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
}
}
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
return Ok(None);
}
Ok(Some((
tenant_shard_id,
Tenant::load_tenant_config(conf, &tenant_shard_id),

View File

@@ -37,7 +37,6 @@ use crate::tenant::{
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use pageserver_api::shard::TenantShardId;
@@ -45,7 +44,7 @@ use rand::Rng;
use remote_storage::{DownloadError, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
use tracing::{info_span, instrument, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
};
@@ -491,9 +490,14 @@ impl<'a> TenantDownloader<'a> {
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
.await
.maybe_fatal_err(&context_msg)?;
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
.expect("Blocking task is never aborted")
.maybe_fatal_err(&context_msg)?;
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
@@ -774,33 +778,19 @@ async fn init_timeline_state(
.await
.fatal_err(&format!("Listing {timeline_path}"))
{
let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
continue;
};
let local_meta = dentry
.metadata()
.await
.fatal_err(&format!("Read metadata on {}", file_path));
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await.fatal_err(&format!(
"Read metadata on {}",
dentry.path().to_string_lossy()
));
let file_name = file_path.file_name().expect("created it from the dentry");
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
continue;
} else if crate::is_temporary(&file_path) {
// Temporary files are frequently left behind from restarting during downloads
tracing::info!("Cleaning up temporary file {file_path}");
if let Err(e) = tokio::fs::remove_file(&file_path)
.await
.or_else(fs_ext::ignore_not_found)
{
tracing::error!("Failed to remove temporary file {file_path}: {e}");
}
continue;
}
match LayerFileName::from_str(file_name) {
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {

View File

@@ -54,7 +54,7 @@ use crate::pgdatadir_mapping::DirectoryKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
metadata::{save_metadata, TimelineMetadata},
par_fsync,
};
use crate::{
@@ -76,7 +76,7 @@ use crate::{
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
@@ -111,10 +111,10 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{layer_map, remote_timeline_client::RemoteTimelineClient};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -210,6 +210,21 @@ pub struct Timeline {
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
/// State that [`Self::create_image_layers`] keeps across invocations to determine if it can
/// skip an invocation becaucse nothing changed.
create_image_layers_skipper_state: std::sync::Mutex<Option<CreateImageLayersParams>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
/// Newly created image layer doesn't help to remove the delta layer, until the
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
/// gc_timeline may still want the new image layer to be created. To avoid redundant
/// image layers creation we should check if image layer exists but beyond PITR horizon.
/// This is why we need remember GC cutoff LSN.
///
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
@@ -291,8 +306,9 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>,
/// Used by [`Timline::repartition`] to avoid re-computing partitioning on every iteration.
/// Must bump [`CompactionKeyspacePartitioning::version`] when changing.
partitioning: tokio::sync::Mutex<CompactionKeyspacePartitioning>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -334,7 +350,7 @@ pub struct Timeline {
///
/// Must only be taken in two places:
/// - [`Timeline::compact`] (this file)
/// - [`delete::delete_local_timeline_directory`]
/// - [`delete::delete_local_layer_files`]
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
compaction_lock: tokio::sync::Mutex<()>,
@@ -343,7 +359,7 @@ pub struct Timeline {
///
/// Must only be taken in two places:
/// - [`Timeline::gc`] (this file)
/// - [`delete::delete_local_timeline_directory`]
/// - [`delete::delete_local_layer_files`]
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
@@ -389,6 +405,57 @@ pub struct GcInfo {
pub pitr_cutoff: Lsn,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct CreateImageLayersParams {
/// From [`layer_map::LayerMap::get_rebuild_version`].
layer_map_version: layer_map::RebuildVersion,
/// From [`Timeline::partitioning`].
partitioning_version: CompactionKeyspacePartitioningVersion,
}
/// The version of the keyspace partitioning maintained in [`Timeline::partitioning`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CompactionKeyspacePartitioningVersion(u64);
impl CompactionKeyspacePartitioningVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
#[derive(Clone)]
struct CompactionKeyspacePartitioning {
version: CompactionKeyspacePartitioningVersion,
partitioning: KeyPartitioning,
lsn: Lsn,
}
struct CompactionKeyspacePartition<'a> {
key_space: &'a KeySpace,
lsn: Lsn,
}
impl CompactionKeyspacePartitioning {
pub fn update(&mut self, partitioning: KeyPartitioning, lsn: Lsn) {
assert!(self.lsn <= lsn);
self.version.inc();
self.partitioning = partitioning;
self.lsn = lsn;
}
pub fn iter(&self) -> impl Iterator<Item = CompactionKeyspacePartition<'_>> {
self.partitioning
.parts
.iter()
.map(|key_space| CompactionKeyspacePartition {
key_space,
lsn: self.lsn,
})
}
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
@@ -1143,7 +1210,7 @@ impl Timeline {
)
.await
{
Ok((partitioning, lsn)) => {
Ok(partitioning) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -1157,7 +1224,7 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
.create_image_layers(&partitioning, false, &image_ctx)
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
@@ -1505,6 +1572,13 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -1578,6 +1652,7 @@ impl Timeline {
shard_identity,
pg_version,
layers: Default::default(),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
walreceiver: Mutex::new(None),
@@ -1640,8 +1715,15 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
partitioning: tokio::sync::Mutex::new(CompactionKeyspacePartitioning {
// The other fields don't matter because this is 0 and hence repartition(),
// will calculate a new one
lsn: Lsn(0),
version: CompactionKeyspacePartitioningVersion(0),
partitioning: KeyPartitioning::new(),
}),
repartition_threshold: 0,
create_image_layers_skipper_state: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
@@ -1826,11 +1908,7 @@ impl Timeline {
discovered_layers.push((file_name, file_size));
continue;
}
Discovered::Metadata => {
warn!("found legacy metadata file, these should have been removed in load_tenant_config");
continue;
}
Discovered::IgnoredBackup => {
Discovered::Metadata | Discovered::IgnoredBackup => {
continue;
}
Discovered::Unknown(file_name) => {
@@ -2337,7 +2415,7 @@ impl Timeline {
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
if !self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id)
.metadata_path(&self.tenant_shard_id, &self.timeline_id)
.exists()
{
error!("timeline-calculate-logical-size-pre metadata file does not exist")
@@ -3136,7 +3214,7 @@ impl Timeline {
}
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
let partitioning = self
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
@@ -3151,8 +3229,7 @@ impl Timeline {
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?,
self.create_image_layers(&partitioning, true, ctx).await?,
None,
)
} else {
@@ -3192,7 +3269,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let metadata = {
let mut guard = self.layers.write().await;
if self.cancel.is_cancelled() {
@@ -3206,7 +3283,9 @@ impl Timeline {
self.disk_consistent_lsn.store(disk_consistent_lsn);
// Schedule remote uploads that will reflect our new disk_consistent_lsn
self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
} else {
None
}
// release lock on 'layers'
};
@@ -3221,6 +3300,22 @@ impl Timeline {
// This failpoint is used by another test case `test_pageserver_recovery`.
fail_point!("flush-frozen-exit");
// Update the metadata file, with new 'disk_consistent_lsn'
//
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
if let Some(metadata) = metadata {
save_metadata(
self.conf,
&self.tenant_shard_id,
&self.timeline_id,
&metadata,
)
.await
.context("save_metadata")?;
}
Ok(())
}
@@ -3276,6 +3371,25 @@ impl Timeline {
Ok(metadata)
}
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<()> {
let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
save_metadata(
self.conf,
&self.tenant_shard_id,
&self.timeline_id,
&metadata,
)
.await
.context("save_metadata")?;
Ok(())
}
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
if let Some(remote_client) = &self.remote_client {
remote_client
@@ -3353,19 +3467,19 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
) -> anyhow::Result<CompactionKeyspacePartitioning> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timelien.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
anyhow::bail!("repartition() called concurrently, this should not happen");
};
if lsn < partitioning_guard.1 {
if lsn < partitioning_guard.lsn {
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
}
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0)
let distance = lsn.0 - partitioning_guard.lsn.0;
if partitioning_guard.lsn != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
@@ -3374,25 +3488,56 @@ impl Timeline {
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
return Ok(partitioning_guard.clone());
}
let keyspace = self.collect_keyspace(lsn, ctx).await?;
let partitioning = keyspace.partition(partition_size);
*partitioning_guard = (partitioning, lsn);
partitioning_guard.update(partitioning, lsn);
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
Ok(partitioning_guard.clone())
}
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
async fn time_for_new_image_layer(
&self,
&CompactionKeyspacePartition {
key_space: partition,
lsn,
}: &CompactionKeyspacePartition<'_>,
) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut max_deltas = 0;
{
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
let img_range =
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
if wanted.overlaps(&img_range) {
//
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
// but create_image_layers creates image layers at last-record-lsn.
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
// but the range is already covered by image layers at more recent LSNs. Before we
// create a new image layer, check if the range is already covered at more recent LSNs.
if !layers
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
{
debug!(
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
img_range.start, img_range.end, cutoff_lsn, lsn
);
return true;
}
}
}
}
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn);
for (img_range, last_img) in image_coverage {
@@ -3436,14 +3581,14 @@ impl Timeline {
false
}
#[tracing::instrument(skip_all, fields(%lsn, %force))]
#[tracing::instrument(skip_all, fields(lsn=%partitioning.lsn, %force))]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
partitioning: &CompactionKeyspacePartitioning,
force: bool,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
// REVIEW: should we not even bump this timer if we're taking short exit?
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();
@@ -3458,9 +3603,30 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(partition, lsn).await {
let new_inputs = {
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
CreateImageLayersParams {
layer_map_version: layer_map.get_rebuild_version(),
partitioning_version: partitioning.version,
}
};
let last_inputs = {
// bail out early if nothing changed
let last = self.create_image_layers_skipper_state.lock().unwrap();
match (force, &*last, &new_inputs) {
(false, Some(last), new_inputs) if last == new_inputs => {
debug!(?last, ?new_inputs, "inputs to image layer creation algorithm have not changed since last invocation");
return Ok(image_layers);
}
_ => (), // we'll update .last once we finish with success
}
*last
};
for partition in partitioning.iter() {
let img_range = start..partition.key_space.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(&partition).await {
start = img_range.end;
continue;
}
@@ -3470,7 +3636,7 @@ impl Timeline {
self.timeline_id,
self.tenant_shard_id,
&img_range,
lsn,
partitioning.lsn,
)
.await?;
@@ -3483,7 +3649,7 @@ impl Timeline {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
for range in &partition.key_space.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
@@ -3507,7 +3673,11 @@ impl Timeline {
|| last_key_in_range
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.get_vectored(
key_request_accum.consume_keyspace(),
partitioning.lsn,
ctx,
)
.await?;
for (img_key, img) in results {
@@ -3563,6 +3733,12 @@ impl Timeline {
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
}
}
// All layers that the GC wanted us to create have now been created.
//
// It's possible that another GC cycle happened while we were compacting, and added
// something new to wanted_image_layers, and we now clear that before processing it.
// That's OK, because the next GC iteration will put it back in.
*self.wanted_image_layers.lock().unwrap() = None;
// Sync the new layer to disk before adding it to the layer map, to make sure
// we don't garbage collect something based on the new layer, before it has
@@ -3590,6 +3766,25 @@ impl Timeline {
.context("fsync of timeline dir")?;
}
// Remember the inputs so that we take the early exit next compaction iteration
// if nothing changed in the meantime.
// NB: since we created `new_inputs`, the layer map might have changed and hence
// `get_rebuild_version()` might have advanced already, e.g., by a
// concurrent garbage collection iteration that removed layers. That's ok, next
// `new_inputs` will observe an increased `get_rebuild_version()` and run again.
// Same goes for the modifications that we're doing below: if `image_layers` is
// not empty, we'll insert them into the layer map in the code below, which
// will bump `get_rebuild_version()`, which will make us re-run once more.
//
{
let mut state = self.create_image_layers_skipper_state.lock().unwrap();
if *state != last_inputs {
warn!(?last_inputs, ?new_inputs, observed=?*state, "unexpected: create_image_layers called concurrently? not caching inputs");
} else {
*state = Some(new_inputs);
}
}
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
@@ -4472,6 +4667,7 @@ impl Timeline {
debug!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
// Scan all layers in the timeline (remote or on-disk).
//
@@ -4553,6 +4749,15 @@ impl Timeline {
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
{
debug!("keeping {} because it is the latest layer", l.filename());
// Collect delta key ranges that need image layers to allow garbage
// collecting the layers.
// It is not so obvious whether we need to propagate information only about
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
continue 'outer;
}
@@ -4565,13 +4770,24 @@ impl Timeline {
);
layers_to_remove.push(l);
}
self.wanted_image_layers
.lock()
.unwrap()
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value before we actually remove anything.
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
//
// This does not in fact have any effect as we no longer consider local metadata unless
// running without remote storage.
//
// This unconditionally schedules also an index_part.json update, even though, we will
// be doing one a bit later with the unlinked gc'd layers.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)?;
//
// TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
self.update_metadata_file(self.disk_consistent_lsn.load(), None)
.await?;
let gc_layers = layers_to_remove
.iter()

View File

@@ -6,7 +6,7 @@ use std::{
use anyhow::Context;
use pageserver_api::{models::TimelineState, shard::TenantShardId};
use tokio::sync::OwnedMutexGuard;
use tracing::{debug, error, info, instrument, Instrument};
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::{crashsafe, fs_ext, id::TimelineId};
use crate::{
@@ -124,7 +124,7 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
/// No timeout here, GC & Compaction should be responsive to the
/// `TimelineState::Stopping` change.
// pub(super): documentation link
pub(super) async fn delete_local_timeline_directory(
pub(super) async fn delete_local_layer_files(
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline: &Timeline,
@@ -149,6 +149,8 @@ pub(super) async fn delete_local_timeline_directory(
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we guarantee crash-safety by persising delete mark file.
//
// Note that here we do not bail out on std::io::ErrorKind::NotFound.
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
@@ -156,21 +158,72 @@ pub(super) async fn delete_local_timeline_directory(
//
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
// no locks are shared.
tokio::fs::remove_dir_all(local_timeline_directory)
.await
.or_else(fs_ext::ignore_not_found)
.context("remove local timeline directory")?;
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
//
// Note that metadata removal is skipped, this is not technically needed,
// but allows to reuse timeline loading code during resumed deletion.
// (we always expect that metadata is in place when timeline is being loaded)
// Make sure previous deletions are ordered before mark removal.
// Otherwise there is no guarantee that they reach the disk before mark deletion.
// So its possible for mark to reach disk first and for other deletions
// to be reordered later and thus missed if a crash occurs.
// Note that we dont need to sync after mark file is removed
// because we can tolerate the case when mark file reappears on startup.
let timeline_path = conf.timelines_path(&tenant_shard_id);
crashsafe::fsync_async(timeline_path)
.await
.context("fsync_pre_mark_remove")?;
#[cfg(feature = "testing")]
let mut counter = 0;
// Timeline directory may not exist if we failed to delete mark file and request was retried.
if !local_timeline_directory.exists() {
return Ok(());
}
let metadata_path = conf.metadata_path(&tenant_shard_id, &timeline.timeline_id);
for entry in walkdir::WalkDir::new(&local_timeline_directory).contents_first(true) {
#[cfg(feature = "testing")]
{
counter += 1;
if counter == 2 {
fail::fail_point!("timeline-delete-during-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-during-rm"))?
});
}
}
let entry = entry?;
if entry.path() == metadata_path {
debug!("found metadata, skipping");
continue;
}
if entry.path() == local_timeline_directory {
// Keeping directory because metedata file is still there
debug!("found timeline dir itself, skipping");
continue;
}
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(e) => {
if crate::is_walkdir_io_not_found(&e) {
warn!(
timeline_dir=?local_timeline_directory,
path=?entry.path().display(),
"got not found err while removing timeline dir, proceeding anyway"
);
continue;
}
anyhow::bail!(e);
}
};
if metadata.is_dir() {
warn!(path=%entry.path().display(), "unexpected directory under timeline dir");
tokio::fs::remove_dir(entry.path()).await
} else {
tokio::fs::remove_file(entry.path()).await
}
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
}
info!("finished deleting layer files, releasing locks");
drop(guards);
@@ -201,6 +254,39 @@ async fn cleanup_remaining_timeline_fs_traces(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
// Remove local metadata
tokio::fs::remove_file(conf.metadata_path(&tenant_shard_id, &timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
.context("remove metadata")?;
fail::fail_point!("timeline-delete-after-rm-metadata", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-after-rm-metadata"
))?
});
// Remove timeline dir
tokio::fs::remove_dir(conf.timeline_path(&tenant_shard_id, &timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
.context("timeline dir")?;
fail::fail_point!("timeline-delete-after-rm-dir", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm-dir"))?
});
// Make sure previous deletions are ordered before mark removal.
// Otherwise there is no guarantee that they reach the disk before mark deletion.
// So its possible for mark to reach disk first and for other deletions
// to be reordered later and thus missed if a crash occurs.
// Note that we dont need to sync after mark file is removed
// because we can tolerate the case when mark file reappears on startup.
let timeline_path = conf.timelines_path(&tenant_shard_id);
crashsafe::fsync_async(timeline_path)
.await
.context("fsync_pre_mark_remove")?;
// Remove delete mark
// TODO: once we are confident that no more exist in the field, remove this
// line. It cleans up a legacy marker file that might in rare cases be present.
@@ -466,12 +552,15 @@ impl DeleteTimelineFlow {
tenant: &Tenant,
timeline: &Timeline,
) -> Result<(), DeleteTimelineError> {
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
delete_local_layer_files(conf, tenant.tenant_shard_id, timeline).await?;
delete_remote_layers_and_index(timeline).await?;
pausable_failpoint!("in_progress_delete");
cleanup_remaining_timeline_fs_traces(conf, tenant.tenant_shard_id, timeline.timeline_id)
.await?;
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
*guard = Self::Finished;

View File

@@ -130,7 +130,7 @@ pub(super) struct UploadQueueStopped {
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
Uninitialized,
#[error("queue is in state Stopped")]
#[error("queue is in state Stopping")]
Stopped,
#[error("queue is shutting down")]
ShuttingDown,

View File

@@ -1,38 +0,0 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! truth.
use anyhow::Context;
use std::path::Path;
use pageserver_api::models::PageserverUtilization;
pub(crate) fn regenerate(tenants_path: &Path) -> anyhow::Result<PageserverUtilization> {
// TODO: currently the http api ratelimits this to 1Hz at most, which is probably good enough
let statvfs = nix::sys::statvfs::statvfs(tenants_path)
.map_err(std::io::Error::from)
.context("statvfs tenants directory")?;
let blocksz = statvfs.block_size();
#[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))]
let free = statvfs.blocks_available() as u64 * blocksz;
let used = crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.get();
let captured_at = std::time::SystemTime::now();
let doc = PageserverUtilization {
disk_usage_bytes: used,
free_space_bytes: free,
// lower is better; start with a constant
//
// note that u64::MAX will be output as i64::MAX as u64, but that should not matter
utilization_score: u64::MAX,
captured_at,
};
// TODO: make utilization_score into a metric
Ok(doc)
}

View File

@@ -19,13 +19,14 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
@@ -403,34 +404,47 @@ impl VirtualFile {
Ok(vfile)
}
/// Async version of [`::utils::crashsafe::overwrite`].
/// Writes a file to the specified `final_path` in a crash safe fasion
///
/// # NB:
///
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
/// it did at an earlier time.
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: B,
) -> std::io::Result<()> {
// TODO: use tokio_epoll_uring if configured as `io_engine`.
// See https://github.com/neondatabase/neon/issues/6663
tokio::task::spawn_blocking(move || {
let slice_storage;
let content_len = content.bytes_init();
let content = if content.bytes_init() > 0 {
slice_storage = Some(content.slice(0..content_len));
slice_storage.as_deref().expect("just set it to Some()")
} else {
&[]
};
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
})
.await
.expect("blocking task is never aborted")
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
}
/// Call File::sync_all() on the underlying File.
@@ -1323,7 +1337,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1332,7 +1346,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1354,7 +1368,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();

View File

@@ -334,12 +334,6 @@ impl WalIngest {
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
@@ -366,13 +360,6 @@ impl WalIngest {
}
}
}
pg_constants::RM_STANDBY_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
}
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol

View File

@@ -773,42 +773,6 @@ impl XlLogicalMessage {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlRunningXacts {
pub xcnt: u32,
pub subxcnt: u32,
pub subxid_overflow: bool,
pub next_xid: TransactionId,
pub oldest_running_xid: TransactionId,
pub latest_completed_xid: TransactionId,
pub xids: Vec<TransactionId>,
}
impl XlRunningXacts {
pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
let xcnt = buf.get_u32_le();
let subxcnt = buf.get_u32_le();
let subxid_overflow = buf.get_u32_le() != 0;
let next_xid = buf.get_u32_le();
let oldest_running_xid = buf.get_u32_le();
let latest_completed_xid = buf.get_u32_le();
let mut xids = Vec::new();
for _ in 0..(xcnt + subxcnt) {
xids.push(buf.get_u32_le());
}
XlRunningXacts {
xcnt,
subxcnt,
subxid_overflow,
next_xid,
oldest_running_xid,
latest_completed_xid,
xids,
}
}
}
/// Main routine to decode a WAL record and figure out which blocks are modified
//
// See xlogrecord.h for details

View File

@@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -35,16 +35,16 @@
#include "utils/memutils.h"
#include "utils/jsonb.h"
#include "neon_utils.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static const char *jwt_token = NULL;
/* GUCs */
static char *ConsoleURL = NULL;
static bool ForwardDDL = true;
/* Curl structures for sending the HTTP requests */
static CURL *CurlHandle;
static struct curl_slist *ContentHeader = NULL;
/*
* CURL docs say that this buffer must exist until we call curl_easy_cleanup
* (which we never do), so we make this a static
@@ -226,8 +226,6 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
static void
SendDeltasToControlPlane()
{
static CURL *handle = NULL;
if (!RootTable.db_table && !RootTable.role_table)
return;
if (!ConsoleURL)
@@ -238,57 +236,29 @@ SendDeltasToControlPlane()
if (!ForwardDDL)
return;
if (handle == NULL)
{
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/json");
if (headers == NULL)
{
elog(ERROR, "Failed to set Content-Type header");
}
if (jwt_token)
{
char auth_header[8192];
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
headers = curl_slist_append(headers, auth_header);
if (headers == NULL)
{
elog(ERROR, "Failed to set Authorization header");
}
}
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(handle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
}
char *message = ConstructDeltaMessage();
ErrorString str;
ErrorString str = {};
str.size = 0;
curl_easy_setopt(handle, CURLOPT_POSTFIELDS, message);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &str);
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(CurlHandle, CURLOPT_HTTPHEADER, ContentHeader);
curl_easy_setopt(CurlHandle, CURLOPT_POSTFIELDS, message);
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &str);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
const int num_retries = 5;
CURLcode curl_status;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(handle)) == 0)
if ((curl_status = curl_easy_perform(CurlHandle)) == 0)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != CURLE_OK)
if (curl_status != 0)
{
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
@@ -296,11 +266,13 @@ SendDeltasToControlPlane()
{
long response_code;
if (curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
bool error_exists = str.size != 0;
if (response_code != 200)
{
if (str.size != 0)
if (error_exists)
{
elog(ERROR,
"Received HTTP code %ld from control plane: %s",
@@ -863,10 +835,34 @@ InitControlPlaneConnector()
NULL,
NULL);
jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
const char *jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
if (!jwt_token)
{
elog(LOG, "Missing NEON_CONTROL_PLANE_TOKEN environment variable, forwarding will not be authenticated");
}
if (curl_global_init(CURL_GLOBAL_DEFAULT))
{
elog(ERROR, "Failed to initialize curl");
}
if ((CurlHandle = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
if ((ContentHeader = curl_slist_append(ContentHeader, "Content-Type: application/json")) == NULL)
{
elog(ERROR, "Failed to initialize content header");
}
if (jwt_token)
{
char auth_header[8192];
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
if ((ContentHeader = curl_slist_append(ContentHeader, auth_header)) == NULL)
{
elog(ERROR, "Failed to initialize authorization header");
}
}
}

View File

@@ -14,8 +14,6 @@
#include "utils/guc.h"
#include "neon_utils.h"
static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
@@ -33,19 +31,15 @@ static download_extension_file_hook_type prev_download_extension_file_hook = NUL
static bool
neon_download_extension_file_http(const char *filename, bool is_library)
{
static CURL *handle = NULL;
CURL *curl;
CURLcode res;
char *compute_ctl_url;
char *postdata;
bool ret = false;
if (handle == NULL)
if ((curl = curl_easy_init()) == NULL)
{
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
elog(ERROR, "Failed to initialize curl handle");
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
@@ -53,22 +47,28 @@ neon_download_extension_file_http(const char *filename, bool is_library)
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */ );
/* Perform the request, res will get the return code */
res = curl_easy_perform(handle);
/* Check for errors */
if (res == CURLE_OK)
if (curl)
{
ret = true;
}
else
{
/*
* Don't error here because postgres will try to find the file and will
* fail with some proper error message if it's not found.
*/
elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res));
/* Perform the request, res will get the return code */
res = curl_easy_perform(curl);
/* Check for errors */
if (res == CURLE_OK)
{
ret = true;
}
else
{
/* Don't error here because postgres will try to find the file */
/* and will fail with some proper error message if it's not found. */
elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
}
return ret;

View File

@@ -18,10 +18,6 @@
#include <sys/file.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include "neon_pgversioncompat.h"
@@ -66,18 +62,6 @@
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define BLKDISCARD _IO(0x12, 119)
int ioctl_discard(int fd, int64_t offset, int64_t len)
{
uint64_t range[2] = {(uint64_t)offset, (uint64_t)len};
int ret = ioctl(fd, BLKDISCARD, range);
if (ret < 0) {
return errno;
}
return 0;
}
typedef struct FileCacheEntry
{
BufferTag key;
@@ -107,7 +91,6 @@ static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static bool lfc_raw_block_device;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
@@ -166,12 +149,7 @@ lfc_disable(char const *op)
* We need to use unlink to to avoid races in LFC write, because it is not
* protectedby
*/
if (lfc_raw_block_device) {
ioctl_discard(lfc_desc, 0, SIZE_MB_TO_CHUNKS(lfc_max_size) * BLOCKS_PER_CHUNK * BLCKSZ);
} else {
unlink(lfc_path);
}
unlink(lfc_path);
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
@@ -328,18 +306,10 @@ lfc_change_limit_hook(int newval, void *extra)
FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0);
if (lfc_raw_block_device) {
if (ioctl_discard(lfc_desc, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) {
neon_log(LOG, "Failed to discard on raw block device: %m");
}
}
else
{
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
}
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
@@ -389,17 +359,6 @@ lfc_init(void)
lfc_change_limit_hook,
NULL);
DefineCustomBoolVariable("neon.file_cache_raw_block_device",
"Set local file cache to raw block device mode.",
NULL,
&lfc_raw_block_device,
false, /* disabled by default */
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,

View File

@@ -1,29 +0,0 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.2'" to load this file. \quit
-- Create a convenient view similar to pg_stat_database
-- that exposes all lfc stat values in one row.
CREATE OR REPLACE VIEW NEON_STAT_FILE_CACHE AS
WITH lfc_stats AS (
SELECT
stat_name,
count
FROM neon_get_lfc_stats() AS t(stat_name text, count bigint)
),
lfc_values AS (
SELECT
MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE NULL END) AS file_cache_misses,
MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE NULL END) AS file_cache_hits,
MAX(CASE WHEN stat_name = 'file_cache_used' THEN count ELSE NULL END) AS file_cache_used,
MAX(CASE WHEN stat_name = 'file_cache_writes' THEN count ELSE NULL END) AS file_cache_writes,
-- Calculate the file_cache_hit_ratio within the same CTE for simplicity
CASE
WHEN MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE 0 END) + MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END) = 0 THEN NULL
ELSE ROUND((MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END)::DECIMAL /
(MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END) + MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE 0 END))) * 100, 2)
END AS file_cache_hit_ratio
FROM lfc_stats
)
SELECT file_cache_misses, file_cache_hits, file_cache_used, file_cache_writes, file_cache_hit_ratio from lfc_values;
-- externalize the view to all users in role pg_monitor
GRANT SELECT ON NEON_STAT_FILE_CACHE TO PG_MONITOR;

View File

@@ -37,8 +37,7 @@
PG_MODULE_MAGIC;
void _PG_init(void);
static int logical_replication_max_snap_files = 300;
bool primary_is_running = false;
static int logical_replication_max_time_lag = 3600;
static void
InitLogicalReplicationMonitor(void)
@@ -46,14 +45,14 @@ InitLogicalReplicationMonitor(void)
BackgroundWorker bgw;
DefineCustomIntVariable(
"neon.logical_replication_max_snap_files",
"Maximum allowed logical replication .snap files",
NULL,
&logical_replication_max_snap_files,
300, 0, INT_MAX,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
"neon.logical_replication_max_time_lag",
"Threshold for dropping unused logical replication slots",
NULL,
&logical_replication_max_time_lag,
3600, 0, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_S,
NULL, NULL, NULL);
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
@@ -69,99 +68,22 @@ InitLogicalReplicationMonitor(void)
RegisterBackgroundWorker(&bgw);
}
static int
LsnDescComparator(const void *a, const void *b)
typedef struct
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return 1;
else if (lsn1 == lsn2)
return 0;
else
return -1;
}
/*
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
* next gc would leave not more than logical_replication_max_snap_files; all
* slots having lower restart_lsn should be dropped.
*/
static XLogRecPtr
get_num_snap_files_lsn_threshold(void)
{
DIR *dirdesc;
struct dirent *de;
char *snap_path = "pg_logical/snapshots/";
int cnt = 0;
int lsns_allocated = 1024;
int lsns_num = 0;
XLogRecPtr *lsns;
XLogRecPtr cutoff;
if (logical_replication_max_snap_files < 0)
return 0;
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
/* find all .snap files and get their lsns */
dirdesc = AllocateDir(snap_path);
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
{
XLogRecPtr lsn;
uint32 hi;
uint32 lo;
if (strcmp(de->d_name, ".") == 0 ||
strcmp(de->d_name, "..") == 0)
continue;
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
{
ereport(LOG,
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
continue;
}
lsn = ((uint64) hi) << 32 | lo;
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
if (lsns_allocated == lsns_num)
{
lsns_allocated *= 2;
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
}
lsns[lsns_num++] = lsn;
}
/* sort by lsn desc */
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
/* and take cutoff at logical_replication_max_snap_files */
if (logical_replication_max_snap_files > lsns_num)
cutoff = 0;
/* have less files than cutoff */
else
{
cutoff = lsns[logical_replication_max_snap_files - 1];
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
}
pfree(lsns);
FreeDir(dirdesc);
return cutoff;
}
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
NameData name;
bool dropped;
XLogRecPtr confirmed_flush_lsn;
TimestampTz last_updated;
} SlotStatus;
/*
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
* need too many .snap files.
*/
PGDLLEXPORT void
LogicalSlotsMonitorMain(Datum main_arg)
{
TimestampTz now,
last_checked;
SlotStatus* slots;
TimestampTz now, last_checked;
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -170,105 +92,75 @@ LogicalSlotsMonitorMain(Datum main_arg)
BackgroundWorkerUnblockSignals();
slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus));
last_checked = GetCurrentTimestamp();
for (;;)
{
XLogRecPtr cutoff_lsn;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
logical_replication_max_time_lag*1000/2,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/*
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.
*/
cutoff_lsn = get_num_snap_files_lsn_threshold();
if (cutoff_lsn > 0)
now = GetCurrentTimestamp();
if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC)
{
int n_active_slots = 0;
last_checked = now;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
char slot_name[NAMEDATALEN];
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
/* find the name */
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/* Consider only logical repliction slots */
if (!s->in_use || !SlotIsLogical(s))
continue;
if (s->active_pid != 0)
{
LWLockRelease(ReplicationSlotControlLock);
n_active_slots += 1;
continue;
}
/* do we need to drop it? */
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
if (restart_lsn >= cutoff_lsn)
/* Check if there was some activity with the slot since last check */
if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn)
{
LWLockRelease(ReplicationSlotControlLock);
continue;
slots[i].confirmed_flush_lsn = s->data.confirmed_flush;
slots[i].last_updated = now;
}
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
LWLockRelease(ReplicationSlotControlLock);
/* now try to drop it, killing owner before if any */
for (;;)
else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC)
{
pid_t active_pid;
slots[i].name = s->data.name;
slots[i].dropped = true;
}
}
LWLockRelease(ReplicationSlotControlLock);
SpinLockAcquire(&s->mutex);
active_pid = s->active_pid;
SpinLockRelease(&s->mutex);
if (active_pid == 0)
/*
* If there are no active subscriptions, then no new snapshots are generated
* and so no need to force slot deletion.
*/
if (n_active_slots != 0)
{
for (int i = 0; i < max_replication_slots; i++)
{
if (slots[i].dropped)
{
/*
* Slot is releasted, try to drop it. Though of course
* it could have been reacquired, so drop can ERROR
* out. Similarly it could have been dropped in the
* meanwhile.
*
* In principle we could remove pg_try/pg_catch, that
* would restart the whole bgworker.
*/
ConditionVariableCancelSleep();
PG_TRY();
{
ReplicationSlotDrop(slot_name, true);
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
}
PG_CATCH();
{
/* log ERROR and reset elog stack */
EmitErrorReport();
FlushErrorState();
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
}
PG_END_TRY();
break;
}
else
{
/* kill the owner and wait for release */
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
(void) kill(active_pid, SIGTERM);
/* We shouldn't get stuck, but to be safe add timeout. */
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds",
(now - slots[i].last_updated)/USECS_PER_SEC);
ReplicationSlotDrop(slots[i].name.data, true);
slots[i].dropped = false;
}
}
}
}
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
LS_MONITOR_CHECK_INTERVAL,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
void
_PG_init(void)
{
@@ -289,15 +181,6 @@ _PG_init(void)
pg_init_extension_server();
DefineCustomBoolVariable(
"neon.primary_is_running",
"true if the primary was running at replica startup. false otherwise",
NULL,
&primary_is_running,
false,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -1,6 +1,5 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.2'
default_version = '1.1'
module_pathname = '$libdir/neon'
relocatable = true
trusted = true

View File

@@ -1,8 +1,5 @@
#include <sys/resource.h>
#ifndef WALPROPOSER_LIB
#include <curl/curl.h>
#endif
#include <sys/resource.h>
#include "postgres.h"
@@ -117,48 +114,3 @@ disable_core_dump()
fprintf(stderr, "WARNING: disable cores setrlimit failed: %s", strerror(save_errno));
}
}
#ifndef WALPROPOSER_LIB
/*
* On macOS with a libcurl that has IPv6 support, curl_global_init() calls
* SCDynamicStoreCopyProxies(), which makes the program multithreaded. An ideal
* place to call curl_global_init() would be _PG_init(), but Neon has to be
* added to shared_preload_libraries, which are loaded in the Postmaster
* process. The Postmaster is not supposed to become multithreaded at any point
* in its lifecycle. Postgres doesn't have any good hook that I know of to
* initialize per-backend structures, so we have to check this on any
* allocation of a CURL handle.
*
* Free the allocated CURL handle with curl_easy_cleanup(3).
*
* https://developer.apple.com/documentation/systemconfiguration/1517088-scdynamicstorecopyproxies
*/
CURL *
alloc_curl_handle(void)
{
static bool curl_initialized = false;
CURL *handle;
if (unlikely(!curl_initialized))
{
/* Protected by mutex internally */
if (curl_global_init(CURL_GLOBAL_DEFAULT))
{
elog(ERROR, "Failed to initialize curl");
}
curl_initialized = true;
}
handle = curl_easy_init();
if (handle == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
return handle;
}
#endif

View File

@@ -1,12 +1,6 @@
#ifndef __NEON_UTILS_H__
#define __NEON_UTILS_H__
#include "lib/stringinfo.h"
#ifndef WALPROPOSER_LIB
#include <curl/curl.h>
#endif
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
@@ -14,10 +8,4 @@ void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
extern void disable_core_dump();
#ifndef WALPROPOSER_LIB
CURL * alloc_curl_handle(void);
#endif
#endif /* __NEON_UTILS_H__ */

77
poetry.lock generated
View File

@@ -858,43 +858,43 @@ files = [
[[package]]
name = "cryptography"
version = "42.0.4"
version = "42.0.2"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:ffc73996c4fca3d2b6c1c8c12bfd3ad00def8621da24f547626bf06441400449"},
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:db4b65b02f59035037fde0998974d84244a64c3265bdef32a827ab9b63d61b18"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dad9c385ba8ee025bb0d856714f71d7840020fe176ae0229de618f14dae7a6e2"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69b22ab6506a3fe483d67d1ed878e1602bdd5912a134e6202c1ec672233241c1"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:e09469a2cec88fb7b078e16d4adec594414397e8879a4341c6ace96013463d5b"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3e970a2119507d0b104f0a8e281521ad28fc26f2820687b3436b8c9a5fcf20d1"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e53dc41cda40b248ebc40b83b31516487f7db95ab8ceac1f042626bc43a2f992"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:c3a5cbc620e1e17009f30dd34cb0d85c987afd21c41a74352d1719be33380885"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bfadd884e7280df24d26f2186e4e07556a05d37393b0f220a840b083dc6a824"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01911714117642a3f1792c7f376db572aadadbafcd8d75bb527166009c9f1d1b"},
{file = "cryptography-42.0.4-cp37-abi3-win32.whl", hash = "sha256:fb0cef872d8193e487fc6bdb08559c3aa41b659a7d9be48b2e10747f47863925"},
{file = "cryptography-42.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c1f25b252d2c87088abc8bbc4f1ecbf7c919e05508a7e8628e6875c40bc70923"},
{file = "cryptography-42.0.4-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:15a1fb843c48b4a604663fa30af60818cd28f895572386e5f9b8a665874c26e7"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1327f280c824ff7885bdeef8578f74690e9079267c1c8bd7dc5cc5aa065ae52"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ffb03d419edcab93b4b19c22ee80c007fb2d708429cecebf1dd3258956a563a"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1df6fcbf60560d2113b5ed90f072dc0b108d64750d4cbd46a21ec882c7aefce9"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:44a64043f743485925d3bcac548d05df0f9bb445c5fcca6681889c7c3ab12764"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:3c6048f217533d89f2f8f4f0fe3044bf0b2090453b7b73d0b77db47b80af8dff"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d0fbe73728c44ca3a241eff9aefe6496ab2656d6e7a4ea2459865f2e8613257"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:887623fe0d70f48ab3f5e4dbf234986b1329a64c066d719432d0698522749929"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:ce8613beaffc7c14f091497346ef117c1798c202b01153a8cc7b8e2ebaaf41c0"},
{file = "cryptography-42.0.4-cp39-abi3-win32.whl", hash = "sha256:810bcf151caefc03e51a3d61e53335cd5c7316c0a105cc695f0959f2c638b129"},
{file = "cryptography-42.0.4-cp39-abi3-win_amd64.whl", hash = "sha256:a0298bdc6e98ca21382afe914c642620370ce0470a01e1bef6dd9b5354c36854"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5f8907fcf57392cd917892ae83708761c6ff3c37a8e835d7246ff0ad251d9298"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:12d341bd42cdb7d4937b0cabbdf2a94f949413ac4504904d0cdbdce4a22cbf88"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1cdcdbd117681c88d717437ada72bdd5be9de117f96e3f4d50dab3f59fd9ab20"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0e89f7b84f421c56e7ff69f11c441ebda73b8a8e6488d322ef71746224c20fce"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:f1e85a178384bf19e36779d91ff35c7617c885da487d689b05c1366f9933ad74"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d2a27aca5597c8a71abbe10209184e1a8e91c1fd470b5070a2ea60cafec35bcd"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4e36685cb634af55e0677d435d425043967ac2f3790ec652b2b88ad03b85c27b"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f47be41843200f7faec0683ad751e5ef11b9a56a220d57f300376cd8aba81660"},
{file = "cryptography-42.0.4.tar.gz", hash = "sha256:831a4b37accef30cccd34fcb916a5d7b5be3cbbe27268a02832c3e450aea39cb"},
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:701171f825dcab90969596ce2af253143b93b08f1a716d4b2a9d2db5084ef7be"},
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:61321672b3ac7aade25c40449ccedbc6db72c7f5f0fdf34def5e2f8b51ca530d"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ea2c3ffb662fec8bbbfce5602e2c159ff097a4631d96235fcf0fb00e59e3ece4"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b15c678f27d66d247132cbf13df2f75255627bcc9b6a570f7d2fd08e8c081d2"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8e88bb9eafbf6a4014d55fb222e7360eef53e613215085e65a13290577394529"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:a047682d324ba56e61b7ea7c7299d51e61fd3bca7dad2ccc39b72bd0118d60a1"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:36d4b7c4be6411f58f60d9ce555a73df8406d484ba12a63549c88bd64f7967f1"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:a00aee5d1b6c20620161984f8ab2ab69134466c51f58c052c11b076715e72929"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:b97fe7d7991c25e6a31e5d5e795986b18fbbb3107b873d5f3ae6dc9a103278e9"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:5fa82a26f92871eca593b53359c12ad7949772462f887c35edaf36f87953c0e2"},
{file = "cryptography-42.0.2-cp37-abi3-win32.whl", hash = "sha256:4b063d3413f853e056161eb0c7724822a9740ad3caa24b8424d776cebf98e7ee"},
{file = "cryptography-42.0.2-cp37-abi3-win_amd64.whl", hash = "sha256:841ec8af7a8491ac76ec5a9522226e287187a3107e12b7d686ad354bb78facee"},
{file = "cryptography-42.0.2-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:55d1580e2d7e17f45d19d3b12098e352f3a37fe86d380bf45846ef257054b242"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28cb2c41f131a5758d6ba6a0504150d644054fd9f3203a1e8e8d7ac3aea7f73a"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b9097a208875fc7bbeb1286d0125d90bdfed961f61f214d3f5be62cd4ed8a446"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:44c95c0e96b3cb628e8452ec060413a49002a247b2b9938989e23a2c8291fc90"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2f9f14185962e6a04ab32d1abe34eae8a9001569ee4edb64d2304bf0d65c53f3"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:09a77e5b2e8ca732a19a90c5bca2d124621a1edb5438c5daa2d2738bfeb02589"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:ad28cff53f60d99a928dfcf1e861e0b2ceb2bc1f08a074fdd601b314e1cc9e0a"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:130c0f77022b2b9c99d8cebcdd834d81705f61c68e91ddd614ce74c657f8b3ea"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:fa3dec4ba8fb6e662770b74f62f1a0c7d4e37e25b58b2bf2c1be4c95372b4a33"},
{file = "cryptography-42.0.2-cp39-abi3-win32.whl", hash = "sha256:3dbd37e14ce795b4af61b89b037d4bc157f2cb23e676fa16932185a04dfbf635"},
{file = "cryptography-42.0.2-cp39-abi3-win_amd64.whl", hash = "sha256:8a06641fb07d4e8f6c7dda4fc3f8871d327803ab6542e33831c7ccfdcb4d0ad6"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:087887e55e0b9c8724cf05361357875adb5c20dec27e5816b653492980d20380"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a7ef8dd0bf2e1d0a27042b231a3baac6883cdd5557036f5e8df7139255feaac6"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4383b47f45b14459cab66048d384614019965ba6c1a1a141f11b5a551cace1b2"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:fbeb725c9dc799a574518109336acccaf1303c30d45c075c665c0793c2f79a7f"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:320948ab49883557a256eab46149df79435a22d2fefd6a66fe6946f1b9d9d008"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5ef9bc3d046ce83c4bbf4c25e1e0547b9c441c01d30922d812e887dc5f125c12"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:52ed9ebf8ac602385126c9a2fe951db36f2cb0c2538d22971487f89d0de4065a"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:141e2aa5ba100d3788c0ad7919b288f89d1fe015878b9659b307c9ef867d3a65"},
{file = "cryptography-42.0.2.tar.gz", hash = "sha256:e0ec52ba3c7f1b7d813cd52649a5b3ef1fc0d433219dc8c93827c57eab6cf888"},
]
[package.dependencies]
@@ -2182,7 +2182,6 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -2572,16 +2571,6 @@ files = [
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},

View File

@@ -168,11 +168,12 @@ impl CancelClosure {
cancel_token,
}
}
/// Cancels the query running on user's compute node.
pub async fn try_cancel_query(self) -> Result<(), CancelError> {
async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;
info!("query was cancelled");
Ok(())
}
}

View File

@@ -1,5 +1,4 @@
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
use std::future::poll_fn;
use std::io;
@@ -40,51 +39,42 @@ where
}
}
#[tracing::instrument(skip_all)]
pub(super) async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
pub(super) async fn copy_bidirectional<A, B>(
a: &mut A,
b: &mut B,
) -> Result<(u64, u64), std::io::Error>
where
Client: AsyncRead + AsyncWrite + Unpin + ?Sized,
Compute: AsyncRead + AsyncWrite + Unpin + ?Sized,
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let mut client_to_compute = TransferState::Running(CopyBuffer::new());
let mut compute_to_client = TransferState::Running(CopyBuffer::new());
let mut a_to_b = TransferState::Running(CopyBuffer::new());
let mut b_to_a = TransferState::Running(CopyBuffer::new());
poll_fn(|cx| {
let mut client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)?;
let mut compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)?;
let mut a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
let mut b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
// Early termination checks from compute to client.
if let TransferState::Done(_) = compute_to_client {
if let TransferState::Running(buf) = &client_to_compute {
info!("Compute is done, terminate client");
// Early termination checks
if let TransferState::Done(_) = a_to_b {
if let TransferState::Running(buf) = &b_to_a {
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)?;
b_to_a = TransferState::ShuttingDown(buf.amt);
b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
}
}
// Early termination checks from compute to client.
if let TransferState::Done(_) = client_to_compute {
if let TransferState::Running(buf) = &compute_to_client {
info!("Client is done, terminate compute");
if let TransferState::Done(_) = b_to_a {
if let TransferState::Running(buf) = &a_to_b {
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, client, compute)?;
a_to_b = TransferState::ShuttingDown(buf.amt);
a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
}
}
// It is not a problem if ready! returns early ... (comment remains the same)
let client_to_compute = ready!(client_to_compute_result);
let compute_to_client = ready!(compute_to_client_result);
let a_to_b = ready!(a_to_b_result);
let b_to_a = ready!(b_to_a_result);
Poll::Ready(Ok((client_to_compute, compute_to_client)))
Poll::Ready(Ok((a_to_b, b_to_a)))
})
.await
}
@@ -229,46 +219,38 @@ mod tests {
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn test_client_to_compute() {
let (mut client_client, mut client_proxy) = tokio::io::duplex(8); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(32); // Create a mock duplex stream
async fn test_early_termination_a_to_d() {
let (mut a_mock, mut b_mock) = tokio::io::duplex(8); // Create a mock duplex stream
let (mut c_mock, mut d_mock) = tokio::io::duplex(32); // Create a mock duplex stream
// Simulate 'a' finishing while there's still data for 'b'
client_client.write_all(b"hello").await.unwrap();
client_client.shutdown().await.unwrap();
compute_client.write_all(b"Neon").await.unwrap();
compute_client.shutdown().await.unwrap();
a_mock.write_all(b"hello").await.unwrap();
a_mock.shutdown().await.unwrap();
d_mock.write_all(b"Neon Serverless Postgres").await.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result = copy_bidirectional(&mut b_mock, &mut c_mock).await.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(client_to_compute_count, 5); // 'hello' was transferred
assert_eq!(compute_to_client_count, 4); // response only partially transferred or not at all
let (a_to_d_count, d_to_a_count) = result;
assert_eq!(a_to_d_count, 5); // 'hello' was transferred
assert!(d_to_a_count <= 8); // response only partially transferred or not at all
}
#[tokio::test]
async fn test_compute_to_client() {
let (mut client_client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(8); // Create a mock duplex stream
async fn test_early_termination_d_to_a() {
let (mut a_mock, mut b_mock) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut c_mock, mut d_mock) = tokio::io::duplex(8); // Create a mock duplex stream
// Simulate 'a' finishing while there's still data for 'b'
compute_client.write_all(b"hello").await.unwrap();
compute_client.shutdown().await.unwrap();
client_client
.write_all(b"Neon Serverless Postgres")
.await
.unwrap();
d_mock.write_all(b"hello").await.unwrap();
d_mock.shutdown().await.unwrap();
a_mock.write_all(b"Neon Serverless Postgres").await.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result = copy_bidirectional(&mut b_mock, &mut c_mock).await.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(compute_to_client_count, 5); // 'hello' was transferred
assert!(client_to_compute_count <= 8); // response only partially transferred or not at all
let (a_to_d_count, d_to_a_count) = result;
assert_eq!(d_to_a_count, 5); // 'hello' was transferred
assert!(a_to_d_count <= 8); // response only partially transferred or not at all
}
}

View File

@@ -46,11 +46,7 @@ pub async fn proxy_pass(
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
&mut client,
&mut compute,
)
.await?;
let _ = crate::proxy::copy_bidirectional::copy_bidirectional(&mut client, &mut compute).await?;
Ok(())
}
@@ -67,8 +63,6 @@ pub struct ProxyPassthrough<S> {
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
pub async fn proxy_pass(self) -> anyhow::Result<()> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
self.compute.cancel_closure.try_cancel_query().await?;
res
proxy_pass(self.client, self.compute.stream, self.aux).await
}
}

View File

@@ -166,10 +166,6 @@ struct Args {
/// useful for debugging.
#[arg(long)]
current_thread_runtime: bool,
/// Keep horizon for walsenders, i.e. don't remove WAL segments that are
/// still needed for existing replication connection.
#[arg(long)]
walsenders_keep_horizon: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -299,7 +295,6 @@ async fn main() -> anyhow::Result<()> {
pg_tenant_only_auth,
http_auth,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
};
// initialize sentry if SENTRY_DSN is provided

View File

@@ -78,7 +78,6 @@ pub struct SafeKeeperConf {
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
pub http_auth: Option<Arc<SwappableJwtAuth>>,
pub current_thread_runtime: bool,
pub walsenders_keep_horizon: bool,
}
impl SafeKeeperConf {
@@ -122,7 +121,6 @@ impl SafeKeeperConf {
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
current_thread_runtime: false,
walsenders_keep_horizon: false,
}
}
}

View File

@@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
@@ -946,12 +946,28 @@ where
}
Ok(())
}
/// Get oldest segno we still need to keep. We hold WAL till it is consumed
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
let mut horizon_lsn = min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
}
horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
}
}
#[cfg(test)]
mod tests {
use futures::future::BoxFuture;
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
use postgres_ffi::WAL_SEGMENT_SIZE;
use super::*;
use crate::{

View File

@@ -136,21 +136,6 @@ impl WalSenders {
self.mutex.lock().slots.iter().flatten().cloned().collect()
}
/// Get LSN of the most lagging pageserver receiver. Return None if there are no
/// active walsenders.
pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
self.mutex
.lock()
.slots
.iter()
.flatten()
.filter_map(|s| match s.feedback {
ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
ReplicationFeedback::Standby(_) => None,
})
.min()
}
/// Get aggregated pageserver feedback.
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
self.mutex.lock().agg_ps_feedback

View File

@@ -286,29 +286,6 @@ impl SharedState {
.cloned()
.collect()
}
/// Get oldest segno we still need to keep. We hold WAL till it is consumed
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
fn get_horizon_segno(
&self,
wal_backup_enabled: bool,
extra_horizon_lsn: Option<Lsn>,
) -> XLogSegNo {
let state = &self.sk.state;
use std::cmp::min;
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, state.backup_lsn);
}
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
}
horizon_lsn.segment_number(state.server.wal_seg_size as usize)
}
}
#[derive(Debug, thiserror::Error)]
@@ -376,12 +353,6 @@ pub struct Timeline {
/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
/// Should we keep WAL on disk for active replication connections.
/// Especially useful for sharding, when different shards process WAL
/// with different speed.
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
walsenders_keep_horizon: bool,
}
impl Timeline {
@@ -415,7 +386,6 @@ impl Timeline {
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
})
}
@@ -448,7 +418,6 @@ impl Timeline {
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
})
}
@@ -848,20 +817,10 @@ impl Timeline {
bail!(TimelineError::Cancelled(self.ttid));
}
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
// This allows to get better read speed for pageservers that are lagging behind,
// at the cost of keeping more WAL on disk.
let replication_horizon_lsn = if self.walsenders_keep_horizon {
self.walsenders.laggard_lsn()
} else {
None
};
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.write_shared_state().await;
horizon_segno =
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
}

View File

@@ -175,7 +175,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
pg_tenant_only_auth: None,
http_auth: None,
current_thread_runtime: false,
walsenders_keep_horizon: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -155,23 +155,12 @@ class NeonCompare(PgCompare):
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
)
metric_filters = {
"tenant_id": str(self.tenant),
"timeline_id": str(self.timeline),
"file_kind": "layer",
"op_kind": "upload",
}
# use `started` (not `finished`) counters here, because some callers
# don't wait for upload queue to drain
metric_filters = {"tenant_id": str(self.tenant), "timeline_id": str(self.timeline)}
total_files = self.zenbenchmark.get_int_counter_value(
self.env.pageserver,
"pageserver_remote_timeline_client_calls_started_total",
metric_filters,
self.env.pageserver, "pageserver_created_persistent_files_total", metric_filters
)
total_bytes = self.zenbenchmark.get_int_counter_value(
self.env.pageserver,
"pageserver_remote_timeline_client_bytes_started_total",
metric_filters,
self.env.pageserver, "pageserver_written_persistent_bytes_total", metric_filters
)
self.zenbenchmark.record(
"data_uploaded", total_bytes / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -147,6 +147,8 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_smgr_query_seconds_sum",
"pageserver_storage_operations_seconds_count_total",
"pageserver_storage_operations_seconds_sum_total",
"pageserver_created_persistent_files_total",
"pageserver_written_persistent_bytes_total",
"pageserver_evictions_total",
"pageserver_evictions_with_low_residence_duration_total",
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,

View File

@@ -3104,8 +3104,6 @@ class Endpoint(PgProtocol):
# set small 'max_replication_write_lag' to enable backpressure
# and make tests more stable.
config_lines = ["max_replication_write_lag=15MB"] + config_lines
config_lines = ["neon.primary_is_running=on"] + config_lines
self.config(config_lines)
return self
@@ -3812,7 +3810,7 @@ def pytest_addoption(parser: Parser):
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
)
@@ -4149,21 +4147,6 @@ def tenant_get_shards(
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)
while True:
secondary_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
def wait_for_last_flush_lsn(
env: NeonEnv,
endpoint: Endpoint,

View File

@@ -302,15 +302,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
def tenant_list_locations(self):
res = self.get(
f"http://localhost:{self.port}/v1/location_config",
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json["tenant_shards"], list)
return res_json
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)

View File

@@ -13,11 +13,6 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
NB: this test demonstrates the problem. The source tree contained the
`gc_feedback` mechanism for about 9 months, but, there were problems
with it and it wasn't enabled at runtime.
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()

View File

@@ -166,6 +166,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_feedback": True,
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"heatmap_period": "10m",

View File

@@ -3,7 +3,22 @@ import re
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.neon_fixtures import Endpoint, NeonEnv
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = primary.safe_psql_scalar(
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
)
while True:
secondary_lsn = secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -64,7 +79,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
primary.safe_psql("create table t(key int, value text)")
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
wait_replica_caughtup(primary, secondary)
wait_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:

View File

@@ -1,5 +1,4 @@
import time
from functools import partial
from random import choice
from string import ascii_lowercase
@@ -11,7 +10,7 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
from fixtures.utils import query_scalar
def random_string(n: int):
@@ -158,51 +157,6 @@ COMMIT;
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
# Test that neon.logical_replication_max_snap_files works
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
def slot_removed(ep):
assert (
endpoint.safe_psql(
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
)[0][0]
== 0
)
env = neon_simple_env
env.neon_cli.create_branch("test_logical_replication", "empty")
# set low neon.logical_replication_max_snap_files
endpoint = env.endpoints.create_start(
"test_logical_replication",
config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# create obsolete slot
cur.execute("select pg_create_logical_replication_slot('stale_slot', 'pgoutput');")
assert (
endpoint.safe_psql(
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
)[0][0]
== 1
)
# now insert some data and create and start live subscriber to create more .snap files
# (in most cases this is not needed as stale_slot snap will have higher LSN than restart_lsn anyway)
cur.execute("create table t(pk integer primary key, payload integer)")
cur.execute("create publication pub1 for table t")
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
# Test compute start at LSN page of which starts with contrecord
# https://github.com/neondatabase/neon/issues/5749
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):

View File

@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint.wait_for_migrations()
num_migrations = 8
num_migrations = 4
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")

View File

@@ -1,6 +1,5 @@
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -23,9 +22,4 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.2",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
res = cur.fetchall()
log.info(res)
assert len(res) == 1
assert len(res[0]) == 5
assert cur.fetchone() == ("1.1",)

View File

@@ -211,6 +211,7 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
client.deletion_queue_flush(execute=True)
del current_lsn
env.pageserver.stop()
env.pageserver.start()
# We've shut down the SKs, then restarted the PSes to sever all walreceiver connections;

View File

@@ -228,9 +228,9 @@ def test_remote_storage_upload_queue_retries(
tenant_id, timeline_id = env.neon_cli.create_tenant(
conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{64 * 1024}",
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{64 * 1024}",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
@@ -256,24 +256,21 @@ def test_remote_storage_upload_queue_retries(
]
)
FOO_ROWS_COUNT = 4000
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
# create initial set of layers & upload them with failpoints configured
for _v in range(2):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, {FOO_ROWS_COUNT}) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 20000) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
def get_queued_count(file_kind, op_kind):
@@ -336,7 +333,7 @@ def test_remote_storage_upload_queue_retries(
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
churn_while_failpoints_active_thread.join(60)
churn_while_failpoints_active_thread.join(30)
assert not churn_while_failpoints_active_thread.is_alive()
assert churn_thread_result[0]
@@ -368,7 +365,7 @@ def test_remote_storage_upload_queue_retries(
log.info("restarting postgres to validate")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == FOO_ROWS_COUNT
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000
def test_remote_timeline_client_calls_started_metric(
@@ -697,8 +694,10 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
# index upload is now hitting the failpoint, it should block the shutdown
env.pageserver.stop(immediate=True)
timeline_dir = env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id)
assert timeline_dir.is_dir()
local_metadata = (
env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
)
assert local_metadata.is_file()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)

View File

@@ -1,30 +0,0 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
p_cur.execute("select txid_current()")
xid = p_cur.fetchall()[0][0]
log.info(f"Master transaction {xid}")
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary"
) as secondary:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
# Enforce setting hint bits for pg_class tuples.
# If master's transaction is not marked as in-progress in MVCC snapshot,
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)

View File

@@ -235,6 +235,11 @@ def test_sharding_split_smoke(
all_shards = tenant_get_shards(env, tenant_id)
for tenant_shard_id, pageserver in all_shards:
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
# Restart all nodes, to check that the newly created shards are durable
for ps in env.pageservers:
ps.restart()
workload.validate()
migrate_to_pageserver_ids = list(
@@ -283,32 +288,6 @@ def test_sharding_split_smoke(
env.attachment_service.consistency_check()
# Validate pageserver state
shards_exist: list[TenantShardId] = []
for pageserver in env.pageservers:
locations = pageserver.http_client().tenant_list_locations()
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
log.info("Shards after split: {shards_exist}")
assert len(shards_exist) == split_shard_count
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
# correctly wrote config to disk, and the storage controller responds correctly
# to /re-attach)
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
shards_exist = []
for pageserver in env.pageservers:
locations = pageserver.http_client().tenant_list_locations()
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
log.info("Shards after restart: {shards_exist}")
assert len(shards_exist) == split_shard_count
workload.validate()
@pytest.mark.skipif(
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're

View File

@@ -125,20 +125,6 @@ def test_sharding_service_smoke(
time.sleep(1)
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
# Restarting a pageserver should not detach any tenants (i.e. /re-attach works)
before_restart = env.pageservers[1].http_client().tenant_list_locations()
env.pageservers[1].stop()
env.pageservers[1].start()
after_restart = env.pageservers[1].http_client().tenant_list_locations()
assert len(after_restart) == len(before_restart)
# Locations should be the same before & after restart, apart from generations
for _shard_id, tenant in after_restart["tenant_shards"]:
del tenant["generation"]
for _shard_id, tenant in before_restart["tenant_shards"]:
del tenant["generation"]
assert before_restart == after_restart
# Delete all the tenants
for tid in tenant_ids:
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
@@ -272,13 +258,8 @@ def test_sharding_service_onboarding(
env.broker.try_start()
env.attachment_service.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
env.pageservers[0].start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
# This is the pageserver where we'll initially create the tenant
env.pageservers[0].start(register=False)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant

View File

@@ -299,7 +299,8 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# tenant is created with defaults, as in without config file
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
config_path = env.pageserver.tenant_dir(tenant_id) / "config-v1"
config_path = env.pageserver.tenant_dir(tenant_id) / "config"
assert config_path.exists(), "config file is always initially created"
http_client = env.pageserver.http_client()

View File

@@ -130,6 +130,7 @@ FAILPOINTS = [
"timeline-delete-before-index-deleted-at",
"timeline-delete-before-rm",
"timeline-delete-before-index-delete",
"timeline-delete-after-rm-dir",
]
FAILPOINTS_BEFORE_BACKGROUND = [

View File

@@ -157,7 +157,10 @@ def switch_pg_to_new_pageserver(
timeline_to_detach_local_path = origin_ps.timeline_dir(tenant_id, timeline_id)
files_before_detach = os.listdir(timeline_to_detach_local_path)
assert (
len(files_before_detach) >= 1
"metadata" in files_before_detach
), f"Regular timeline {timeline_to_detach_local_path} should have the metadata file, but got: {files_before_detach}"
assert (
len(files_before_detach) >= 2
), f"Regular timeline {timeline_to_detach_local_path} should have at least one layer file, but got {files_before_detach}"
return timeline_to_detach_local_path

View File

@@ -136,9 +136,12 @@ DELETE_FAILPOINTS = [
"timeline-delete-before-index-deleted-at",
"timeline-delete-before-schedule",
"timeline-delete-before-rm",
"timeline-delete-during-rm",
"timeline-delete-after-rm",
"timeline-delete-before-index-delete",
"timeline-delete-after-index-delete",
"timeline-delete-after-rm-metadata",
"timeline-delete-after-rm-dir",
]
@@ -798,7 +801,7 @@ def test_timeline_delete_resumed_on_attach(
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-after-rm"
failpoint = "timeline-delete-during-rm"
ps_http.configure_failpoints((failpoint, "return"))
env.pageserver.allowed_errors.extend(

View File

@@ -1,6 +1,5 @@
{
"postgres-v16": "cc98378b0fa7413b78a197e3292a806865e4056a",
"postgres-v15": "0ec04712d55539550278595e853c172f7aa5fe3e",
"postgres-v14": "4cdba8ec5a3868cec4826bbb3f16c1d3d2ac2283"
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
}

View File

@@ -102,7 +102,7 @@ files:
- metric_name: lfc_used
type: gauge
help: 'LFC chunks used (chunk = 1MB)'
help: 'lfc_used'
key_labels:
values: [lfc_used]
query: |
@@ -124,14 +124,6 @@ files:
query: |
select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes';
- metric_name: lfc_cache_size_limit
type: gauge
help: 'LFC cache size limit in bytes'
key_labels:
values: [lfc_cache_size_limit]
query: |
select pg_size_bytes(current_setting('neon.file_cache_size_limit')) as lfc_cache_size_limit;
build: |
# Build cgroup-tools
#