mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
1 Commits
vlad/wait-
...
walredo_ap
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
579470849b |
@@ -39,7 +39,7 @@ runs:
|
||||
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
|
||||
if [ "${PR_NUMBER}" != "null" ]; then
|
||||
BRANCH_OR_PR=pr-${PR_NUMBER}
|
||||
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || [ "${GITHUB_REF_NAME}" = "release-proxy" ]; then
|
||||
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
|
||||
# Shortcut for special branches
|
||||
BRANCH_OR_PR=${GITHUB_REF_NAME}
|
||||
else
|
||||
|
||||
@@ -19,7 +19,7 @@ runs:
|
||||
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
|
||||
if [ "${PR_NUMBER}" != "null" ]; then
|
||||
BRANCH_OR_PR=pr-${PR_NUMBER}
|
||||
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || [ "${GITHUB_REF_NAME}" = "release-proxy" ]; then
|
||||
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
|
||||
# Shortcut for special branches
|
||||
BRANCH_OR_PR=${GITHUB_REF_NAME}
|
||||
else
|
||||
|
||||
273
.github/workflows/build_and_test.yml
vendored
273
.github/workflows/build_and_test.yml
vendored
@@ -5,7 +5,6 @@ on:
|
||||
branches:
|
||||
- main
|
||||
- release
|
||||
- release-proxy
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
@@ -68,8 +67,6 @@ jobs:
|
||||
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
|
||||
@@ -685,7 +682,7 @@ jobs:
|
||||
})
|
||||
|
||||
trigger-e2e-tests:
|
||||
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' }}
|
||||
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' }}
|
||||
needs: [ check-permissions, promote-images, tag ]
|
||||
uses: ./.github/workflows/trigger-e2e-tests.yml
|
||||
secrets: inherit
|
||||
@@ -693,173 +690,158 @@ jobs:
|
||||
neon-image:
|
||||
needs: [ check-permissions, build-buildtools-image, tag ]
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
- name: Set custom docker config directory
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
- name: Kaniko build neon
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
--build-arg TAG=${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile
|
||||
cache-from: type=registry,ref=neondatabase/neon:cache
|
||||
cache-to: type=registry,ref=neondatabase/neon:cache,mode=max
|
||||
tags: |
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf .docker-custom
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
compute-tools-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
needs: [ check-permissions, build-buildtools-image, tag ]
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
- name: Set custom docker config directory
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
- name: Kaniko build compute tools
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--dockerfile Dockerfile.compute-tools
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
|
||||
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile.compute-tools
|
||||
cache-from: type=registry,ref=neondatabase/compute-tools:cache
|
||||
cache-to: type=registry,ref=neondatabase/compute-tools:cache,mode=max
|
||||
tags: |
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf .docker-custom
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
compute-node-image:
|
||||
needs: [ check-permissions, build-buildtools-image, tag ]
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
|
||||
container:
|
||||
image: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
# Workaround for "Resolving download.osgeo.org (download.osgeo.org)... failed: Temporary failure in name resolution.""
|
||||
# Should be prevented by https://github.com/neondatabase/neon/issues/4281
|
||||
options: --add-host=download.osgeo.org:140.211.15.30
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version: [ v14, v15, v16 ]
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
- name: Set custom docker config directory
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
# Disable parallelism for docker buildkit.
|
||||
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
|
||||
config-inline: |
|
||||
[worker.oci]
|
||||
max-parallelism = 1
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
- name: Kaniko build compute node with extensions
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg PG_VERSION=${{ matrix.version }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--dockerfile Dockerfile.compute-node
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
--cleanup
|
||||
|
||||
- uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
PG_VERSION=${{ matrix.version }}
|
||||
BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
|
||||
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile.compute-node
|
||||
cache-from: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache
|
||||
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache,mode=max
|
||||
tags: |
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf .docker-custom
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
vm-compute-node-image:
|
||||
needs: [ check-permissions, tag, compute-node-image ]
|
||||
@@ -970,7 +952,9 @@ jobs:
|
||||
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v16:${{needs.tag.outputs.build-tag}} vm-compute-node-v16
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy'
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
@@ -982,7 +966,9 @@ jobs:
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v16:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Push images to production ECR
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
|
||||
@@ -1006,7 +992,9 @@ jobs:
|
||||
crane push vm-compute-node-v16 neondatabase/vm-compute-node-v16:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push latest tags to Docker Hub
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
@@ -1096,7 +1084,7 @@ jobs:
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, promote-images, tag, regress-tests, trigger-custom-extensions-build-and-wait ]
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
|
||||
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
@@ -1131,28 +1119,14 @@ jobs:
|
||||
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=false \
|
||||
-f deployProxy=false \
|
||||
-f deployStorage=true \
|
||||
-f deployStorageBroker=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
-f deployStorageBroker=false \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Create git tag
|
||||
if: github.ref_name == 'release' || github.ref_name == 'release-proxy'
|
||||
if: github.ref_name == 'release'
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
@@ -1165,7 +1139,6 @@ jobs:
|
||||
sha: context.sha,
|
||||
})
|
||||
|
||||
# TODO: check how GitHub releases looks for proxy releases and enable it if it's ok
|
||||
- name: Create GitHub release
|
||||
if: github.ref_name == 'release'
|
||||
uses: actions/github-script@v7
|
||||
|
||||
32
.github/workflows/cleanup-caches-by-a-branch.yml
vendored
32
.github/workflows/cleanup-caches-by-a-branch.yml
vendored
@@ -1,32 +0,0 @@
|
||||
# A workflow from
|
||||
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries
|
||||
|
||||
name: cleanup caches by a branch
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- closed
|
||||
|
||||
jobs:
|
||||
cleanup:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Cleanup
|
||||
run: |
|
||||
gh extension install actions/gh-actions-cache
|
||||
|
||||
echo "Fetching list of cache key"
|
||||
cacheKeysForPR=$(gh actions-cache list -R $REPO -B $BRANCH -L 100 | cut -f 1 )
|
||||
|
||||
## Setting this to not fail the workflow while deleting cache keys.
|
||||
set +e
|
||||
echo "Deleting caches..."
|
||||
for cacheKey in $cacheKeysForPR
|
||||
do
|
||||
gh actions-cache delete $cacheKey -R $REPO -B $BRANCH --confirm
|
||||
done
|
||||
echo "Done"
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
REPO: ${{ github.repository }}
|
||||
BRANCH: refs/pull/${{ github.event.pull_request.number }}/merge
|
||||
83
.github/workflows/release.yml
vendored
83
.github/workflows/release.yml
vendored
@@ -2,31 +2,12 @@ name: Create Release Branch
|
||||
|
||||
on:
|
||||
schedule:
|
||||
# It should be kept in sync with if-condition in jobs
|
||||
- cron: '0 6 * * MON' # Storage release
|
||||
- cron: '0 6 * * THU' # Proxy release
|
||||
- cron: '0 6 * * 1'
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
create-storage-release-branch:
|
||||
type: boolean
|
||||
description: 'Create Storage release PR'
|
||||
required: false
|
||||
create-proxy-release-branch:
|
||||
type: boolean
|
||||
description: 'Create Proxy release PR'
|
||||
required: false
|
||||
|
||||
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
|
||||
permissions: {}
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euo pipefail {0}
|
||||
|
||||
jobs:
|
||||
create-storage-release-branch:
|
||||
if: ${{ github.event.schedule == '0 6 * * MON' || format('{0}', inputs.create-storage-release-branch) == 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
create_release_branch:
|
||||
runs-on: [ ubuntu-latest ]
|
||||
|
||||
permissions:
|
||||
contents: write # for `git push`
|
||||
@@ -37,67 +18,27 @@ jobs:
|
||||
with:
|
||||
ref: main
|
||||
|
||||
- name: Set environment variables
|
||||
run: |
|
||||
echo "RELEASE_DATE=$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
|
||||
echo "RELEASE_BRANCH=rc/$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
|
||||
- name: Get current date
|
||||
id: date
|
||||
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Create release branch
|
||||
run: git checkout -b $RELEASE_BRANCH
|
||||
run: git checkout -b releases/${{ steps.date.outputs.date }}
|
||||
|
||||
- name: Push new branch
|
||||
run: git push origin $RELEASE_BRANCH
|
||||
run: git push origin releases/${{ steps.date.outputs.date }}
|
||||
|
||||
- name: Create pull request into release
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
cat << EOF > body.md
|
||||
## Release ${RELEASE_DATE}
|
||||
## Release ${{ steps.date.outputs.date }}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
**Please merge this PR using 'Create a merge commit'!**
|
||||
EOF
|
||||
|
||||
gh pr create --title "Release ${RELEASE_DATE}" \
|
||||
gh pr create --title "Release ${{ steps.date.outputs.date }}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--head "releases/${{ steps.date.outputs.date }}" \
|
||||
--base "release"
|
||||
|
||||
create-proxy-release-branch:
|
||||
if: ${{ github.event.schedule == '0 6 * * THU' || format('{0}', inputs.create-proxy-release-branch) == 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
permissions:
|
||||
contents: write # for `git push`
|
||||
|
||||
steps:
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: main
|
||||
|
||||
- name: Set environment variables
|
||||
run: |
|
||||
echo "RELEASE_DATE=$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
|
||||
echo "RELEASE_BRANCH=rc/proxy/$(date +'%Y-%m-%d')" | tee -a $GITHUB_ENV
|
||||
|
||||
- name: Create release branch
|
||||
run: git checkout -b $RELEASE_BRANCH
|
||||
|
||||
- name: Push new branch
|
||||
run: git push origin $RELEASE_BRANCH
|
||||
|
||||
- name: Create pull request into release
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
cat << EOF > body.md
|
||||
## Proxy release ${RELEASE_DATE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "Proxy release ${RELEASE_DATE}}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release-proxy"
|
||||
|
||||
2
.github/workflows/trigger-e2e-tests.yml
vendored
2
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -51,8 +51,6 @@ jobs:
|
||||
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
|
||||
|
||||
@@ -786,22 +786,6 @@ RUN wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_iv
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_ivm.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_partman"
|
||||
# compile pg_partman extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-partman-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.0.1.tar.gz -O pg_partman.tar.gz && \
|
||||
echo "75b541733a9659a6c90dbd40fccb904a630a32880a6e3044d0c4c5f4c8a65525 pg_partman.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_partman-src && cd pg_partman-src && tar xvzf ../pg_partman.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_partman.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -845,7 +829,6 @@ COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
10
README.md
10
README.md
@@ -230,10 +230,6 @@ postgres=# select * from t;
|
||||
> cargo neon stop
|
||||
```
|
||||
|
||||
#### Handling build failures
|
||||
|
||||
If you encounter errors during setting up the initial tenant, it's best to stop everything (`cargo neon stop`) and remove the `.neon` directory. Then fix the problems, and start the setup again.
|
||||
|
||||
## Running tests
|
||||
|
||||
Ensure your dependencies are installed as described [here](https://github.com/neondatabase/neon#dependency-installation-notes).
|
||||
@@ -263,12 +259,6 @@ You can use [`flamegraph-rs`](https://github.com/flamegraph-rs/flamegraph) or th
|
||||
> It's a [general thing with Rust / lld / mold](https://crbug.com/919499#c16), not specific to this repository.
|
||||
> See [this PR for further instructions](https://github.com/neondatabase/neon/pull/6764).
|
||||
|
||||
## Cleanup
|
||||
|
||||
For cleaning up the source tree from build artifacts, run `make clean` in the source directory.
|
||||
|
||||
For removing every artifact from build and configure steps, run `make distclean`, and also consider removing the cargo binaries in the `target` directory, as well as the database in the `.neon` directory. Note that removing the `.neon` directorz will remove your database, with all data in it. You have been warned!
|
||||
|
||||
## Documentation
|
||||
|
||||
[docs](/docs) Contains a top-level overview of all available markdown documentation.
|
||||
|
||||
@@ -82,12 +82,6 @@ pub fn write_postgres_conf(
|
||||
ComputeMode::Replica => {
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
|
||||
// Inform the replica about the primary state
|
||||
// Default is 'false'
|
||||
if let Some(primary_is_running) = spec.primary_is_running {
|
||||
writeln!(file, "neon.primary_is_running={}", primary_is_running)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -655,9 +655,6 @@ pub fn handle_grants(
|
||||
// remove this code if possible. The worst thing that could happen is that
|
||||
// user won't be able to use public schema in NEW databases created in the
|
||||
// very OLD project.
|
||||
//
|
||||
// Also, alter default permissions so that relations created by extensions can be
|
||||
// used by neon_superuser without permission issues.
|
||||
let grant_query = "DO $$\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
@@ -676,8 +673,6 @@ pub fn handle_grants(
|
||||
GRANT CREATE ON SCHEMA public TO web_access;\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
|
||||
END\n\
|
||||
$$;"
|
||||
.to_string();
|
||||
@@ -782,12 +777,9 @@ BEGIN
|
||||
END
|
||||
$$;"#,
|
||||
"GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION",
|
||||
// Don't remove: these are some SQLs that we originally applied in migrations but turned out to execute somewhere else.
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
// Add new migrations below.
|
||||
// ensure tables created by superusers (i.e., when creating extensions) can be used by neon_superuser.
|
||||
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser",
|
||||
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser",
|
||||
];
|
||||
|
||||
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
|
||||
@@ -814,13 +806,8 @@ $$;"#,
|
||||
client.simple_query(query)?;
|
||||
|
||||
while current_migration < migrations.len() {
|
||||
let migration = &migrations[current_migration];
|
||||
if migration.is_empty() {
|
||||
info!("Skip migration id={}", current_migration);
|
||||
} else {
|
||||
info!("Running migration:\n{}\n", migration);
|
||||
client.simple_query(migration)?;
|
||||
}
|
||||
info!("Running migration:\n{}\n", migrations[current_migration]);
|
||||
client.simple_query(migrations[current_migration])?;
|
||||
current_migration += 1;
|
||||
}
|
||||
let setval = format!(
|
||||
|
||||
@@ -936,8 +936,7 @@ impl Service {
|
||||
node_id: reattach_req.node_id,
|
||||
availability: Some(NodeAvailability::Active),
|
||||
scheduling: None,
|
||||
})
|
||||
.await?;
|
||||
})?;
|
||||
|
||||
// Ordering: we must persist generation number updates before making them visible in the in-memory state
|
||||
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
|
||||
|
||||
@@ -590,7 +590,6 @@ impl Endpoint {
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
primary_is_running: None,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -391,6 +391,11 @@ impl PageServerNode {
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
gc_feedback: settings
|
||||
.remove("gc_feedback")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
@@ -496,6 +501,11 @@ impl PageServerNode {
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
gc_feedback: settings
|
||||
.remove("gc_feedback")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
|
||||
@@ -79,12 +79,6 @@ pub struct ComputeSpec {
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
|
||||
// When we are starting a new replica in hot standby mode,
|
||||
// we need to know if the primary is running.
|
||||
// This is used to determine if replica should wait for
|
||||
// RUNNING_XACTS from primary or not.
|
||||
pub primary_is_running: Option<bool>,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -283,6 +283,7 @@ pub struct TenantConfig {
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
|
||||
@@ -80,9 +80,6 @@ pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
|
||||
// From standbydefs.h
|
||||
pub const XLOG_RUNNING_XACTS: u8 = 0x10;
|
||||
|
||||
// From srlu.h
|
||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||
pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize;
|
||||
|
||||
@@ -119,6 +119,11 @@ pub fn generate_pg_control(
|
||||
// Generate new pg_control needed for bootstrap
|
||||
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
|
||||
|
||||
//reset some fields we don't want to preserve
|
||||
//TODO Check this.
|
||||
//We may need to determine the value from twophase data.
|
||||
checkpoint.oldestActiveXid = 0;
|
||||
|
||||
//save new values in pg_control
|
||||
pg_control.checkPoint = 0;
|
||||
pg_control.checkPointCopy = checkpoint;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
io::{self, Write},
|
||||
io,
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -161,48 +161,6 @@ pub async fn durable_rename(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
|
||||
///
|
||||
/// The file is first written to the specified `tmp_path`, and in a second
|
||||
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
|
||||
/// and atomic rename guarantee that, if we crash at any point, there will never
|
||||
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
|
||||
///
|
||||
/// Callers are responsible for serializing calls of this function for a given `final_path`.
|
||||
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
|
||||
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
|
||||
/// I.e., the atomticity guarantees still hold.
|
||||
pub fn overwrite(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true)
|
||||
.open(tmp_path)?;
|
||||
file.write_all(content)?;
|
||||
file.sync_all()?;
|
||||
drop(file); // don't keep the fd open for longer than we have to
|
||||
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
|
||||
let final_parent_dirfd = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(final_path_parent)?;
|
||||
|
||||
final_parent_dirfd.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::tenant::{
|
||||
};
|
||||
use crate::virtual_file;
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
|
||||
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
};
|
||||
|
||||
@@ -140,6 +140,7 @@ pub mod defaults {
|
||||
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
#gc_feedback = false
|
||||
|
||||
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
|
||||
#secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
|
||||
@@ -825,6 +826,17 @@ impl PageServerConf {
|
||||
.join(connection_id.to_string())
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain timeline's metadata file should be located.
|
||||
pub fn metadata_path(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Utf8PathBuf {
|
||||
self.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(METADATA_FILE_NAME)
|
||||
}
|
||||
|
||||
/// Turns storage remote path of a file into its local path.
|
||||
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
|
||||
remote_path.with_base(&self.workdir)
|
||||
|
||||
@@ -234,7 +234,7 @@ impl DeletionHeader {
|
||||
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
|
||||
let header_path = conf.deletion_header_path();
|
||||
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
|
||||
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
|
||||
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion header")?;
|
||||
|
||||
@@ -325,8 +325,7 @@ impl DeletionList {
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
|
||||
|
||||
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion list")
|
||||
.map_err(Into::into)
|
||||
|
||||
@@ -567,6 +567,114 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
Schedules attach operation to happen in the background for the given tenant.
|
||||
As soon as the caller sends this request, it must assume the pageserver
|
||||
starts writing to the tenant's S3 state unless it receives one of the
|
||||
distinguished errors below that state otherwise.
|
||||
|
||||
If a client receives a not-distinguished response, e.g., a network timeout,
|
||||
it MUST retry the /attach request and poll again for the tenant's
|
||||
attachment status.
|
||||
|
||||
After the client has received a 202, it MUST poll the tenant's
|
||||
attachment status (field `attachment_status`) to reach state `attached`.
|
||||
If the `attachment_status` is missing, the client MUST retry the `/attach`
|
||||
request (goto previous paragraph). This is a robustness measure in case the tenant
|
||||
status endpoint is buggy, but the attach operation is ongoing.
|
||||
|
||||
There is no way to cancel an in-flight request.
|
||||
|
||||
In any case, the client
|
||||
* MUST NOT ASSUME that the /attach request has been lost in the network,
|
||||
* MUST NOT ASSUME that the request has been lost, based on the observation
|
||||
that a subsequent tenant status request returns 404. The request may
|
||||
still be in flight. It must be retried.
|
||||
|
||||
The client SHOULD supply a `TenantConfig` for the tenant in the request body.
|
||||
Settings specified in the config override the pageserver's defaults.
|
||||
It is guaranteed that the config settings are applied before the pageserver
|
||||
starts operating on the tenant. E.g., if the config specifies a specific
|
||||
PITR interval for a tenant, then that setting will be in effect before the
|
||||
pageserver starts the garbage collection loop. This enables a client to
|
||||
guarantee a specific PITR setting across detach/attach cycles.
|
||||
The pageserver will reject the request if it cannot parse the config, or
|
||||
if there are any unknown fields in it.
|
||||
|
||||
If the client does not supply a config, the pageserver will use its defaults.
|
||||
This behavior is deprecated: https://github.com/neondatabase/neon/issues/4282
|
||||
requestBody:
|
||||
required: false
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantAttachRequest"
|
||||
responses:
|
||||
"202":
|
||||
description: Tenant attaching scheduled
|
||||
"400":
|
||||
description: Bad Request
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Timeline not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: |
|
||||
The tenant is already known to Pageserver in some way,
|
||||
and hence this `/attach` call has been rejected.
|
||||
|
||||
Some examples of how this can happen:
|
||||
- tenant was created on this pageserver
|
||||
- tenant attachment was started by an earlier call to `/attach`.
|
||||
|
||||
Callers should poll the tenant status's `attachment_status` field,
|
||||
like for status 202. See the longer description for `POST /attach`
|
||||
for details.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ConflictError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/location_config:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -662,6 +770,66 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/detach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: detach_ignored
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
description: |
|
||||
When true, allow to detach a tenant which state is ignored.
|
||||
post:
|
||||
description: |
|
||||
Remove tenant data (including all corresponding timelines) from pageserver's memory and file system.
|
||||
Files on the remote storage are not affected.
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant detached
|
||||
"400":
|
||||
description: Error when no tenant id found in path parameters
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Tenant not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/ignore:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -1296,6 +1464,16 @@ components:
|
||||
generation:
|
||||
type: integer
|
||||
description: Attachment generation number.
|
||||
TenantAttachRequest:
|
||||
type: object
|
||||
required:
|
||||
- config
|
||||
properties:
|
||||
config:
|
||||
$ref: '#/components/schemas/TenantConfig'
|
||||
generation:
|
||||
type: integer
|
||||
description: Attachment generation number.
|
||||
TenantConfigRequest:
|
||||
allOf:
|
||||
- $ref: '#/components/schemas/TenantConfig'
|
||||
|
||||
@@ -169,6 +169,15 @@ pub fn is_delete_mark(path: &Utf8Path) -> bool {
|
||||
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
|
||||
}
|
||||
|
||||
fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
|
||||
if let Some(e) = e.io_error() {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
|
||||
/// blocking.
|
||||
///
|
||||
|
||||
@@ -642,6 +642,26 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
|
||||
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
|
||||
});
|
||||
|
||||
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
|
||||
// or in testing they estimate how much we would upload if we did.
|
||||
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_created_persistent_files_total",
|
||||
"Number of files created that are meant to be uploaded to cloud storage",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_written_persistent_bytes_total",
|
||||
"Total bytes written that are meant to be uploaded to cloud storage",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_eviction_iteration_duration_seconds_global",
|
||||
@@ -1782,6 +1802,8 @@ pub(crate) struct TimelineMetrics {
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
pub persistent_bytes_written: IntCounter,
|
||||
pub evictions: IntCounter,
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
}
|
||||
@@ -1863,6 +1885,12 @@ impl TimelineMetrics {
|
||||
};
|
||||
let directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>> =
|
||||
Lazy::new(Box::new(directory_entries_count_gauge_closure));
|
||||
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let persistent_bytes_written = PERSISTENT_BYTES_WRITTEN
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let evictions = EVICTIONS
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -1884,6 +1912,8 @@ impl TimelineMetrics {
|
||||
resident_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
directory_entries_count_gauge,
|
||||
num_persistent_files_created,
|
||||
persistent_bytes_written,
|
||||
evictions,
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
@@ -1893,6 +1923,8 @@ impl TimelineMetrics {
|
||||
|
||||
pub(crate) fn record_new_file_metrics(&self, sz: u64) {
|
||||
self.resident_physical_size_add(sz);
|
||||
self.num_persistent_files_created.inc_by(1);
|
||||
self.persistent_bytes_written.inc_by(sz);
|
||||
}
|
||||
|
||||
pub(crate) fn resident_physical_size_sub(&self, sz: u64) {
|
||||
@@ -1925,6 +1957,9 @@ impl Drop for TimelineMetrics {
|
||||
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
|
||||
let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
}
|
||||
let _ =
|
||||
NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
|
||||
@@ -29,6 +29,7 @@ use remote_storage::TimeoutOrCancel;
|
||||
use std::fmt;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -171,6 +172,9 @@ pub(crate) mod throttle;
|
||||
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
|
||||
// re-export for use in remote_timeline_client.rs
|
||||
pub use crate::tenant::metadata::save_metadata;
|
||||
|
||||
// re-export for use in walreceiver
|
||||
pub use crate::tenant::timeline::WalReceiverInfo;
|
||||
|
||||
@@ -1147,6 +1151,17 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
// timeline loading after attach expects to find metadata file for each metadata
|
||||
save_metadata(
|
||||
self.conf,
|
||||
&self.tenant_shard_id,
|
||||
&timeline_id,
|
||||
&remote_metadata,
|
||||
)
|
||||
.await
|
||||
.context("save_metadata")
|
||||
.map_err(LoadLocalTimelineError::Load)?;
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
@@ -2573,24 +2588,19 @@ impl Tenant {
|
||||
legacy_config_path: &Utf8Path,
|
||||
location_conf: &LocationConf,
|
||||
) -> anyhow::Result<()> {
|
||||
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
|
||||
Self::persist_tenant_config_legacy(
|
||||
tenant_shard_id,
|
||||
legacy_config_path,
|
||||
&location_conf.tenant_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let LocationMode::Attached(attach_conf) = &location_conf.mode {
|
||||
// The modern-style LocationConf config file requires a generation to be set. In case someone
|
||||
// is running a pageserver without the infrastructure to set generations, write out the legacy-style
|
||||
// config file that only contains TenantConf.
|
||||
//
|
||||
// This will eventually be removed in https://github.com/neondatabase/neon/issues/5388
|
||||
|
||||
// Once we use LocationMode, generations are mandatory. If we aren't using generations,
|
||||
// then drop out after writing legacy-style config.
|
||||
if attach_conf.generation.is_none() {
|
||||
tracing::info!(
|
||||
"Running without generations, writing legacy-style tenant config file"
|
||||
);
|
||||
Self::persist_tenant_config_legacy(
|
||||
tenant_shard_id,
|
||||
legacy_config_path,
|
||||
&location_conf.tenant_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::debug!("Running without generations, not writing new-style LocationConf");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -2613,10 +2623,17 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let config_path = config_path.to_owned();
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2643,12 +2660,17 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let target_config_path = target_config_path.to_owned();
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3271,7 +3293,10 @@ impl Tenant {
|
||||
|
||||
timeline_struct.init_empty_layer_map(start_lsn);
|
||||
|
||||
if let Err(e) = self.create_timeline_files(&uninit_mark.timeline_path).await {
|
||||
if let Err(e) = self
|
||||
.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
|
||||
.await
|
||||
{
|
||||
error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
return Err(e);
|
||||
@@ -3288,13 +3313,26 @@ impl Tenant {
|
||||
))
|
||||
}
|
||||
|
||||
async fn create_timeline_files(&self, timeline_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
async fn create_timeline_files(
|
||||
&self,
|
||||
timeline_path: &Utf8Path,
|
||||
new_timeline_id: &TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
|
||||
|
||||
fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
|
||||
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
|
||||
});
|
||||
|
||||
save_metadata(
|
||||
self.conf,
|
||||
&self.tenant_shard_id,
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
)
|
||||
.await
|
||||
.context("Failed to create timeline metadata")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3646,6 +3684,7 @@ pub(crate) mod harness {
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
|
||||
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
|
||||
|
||||
@@ -339,6 +339,7 @@ pub struct TenantConf {
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
pub gc_feedback: bool,
|
||||
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
@@ -426,6 +427,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(default)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub gc_feedback: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
@@ -480,6 +485,7 @@ impl TenantConfOpt {
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
|
||||
lazy_slru_download: self
|
||||
.lazy_slru_download
|
||||
@@ -524,6 +530,7 @@ impl Default for TenantConf {
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
@@ -596,6 +603,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
gc_feedback: value.gc_feedback,
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
|
||||
@@ -8,11 +8,20 @@
|
||||
//!
|
||||
//! [`remote_timeline_client`]: super::remote_timeline_client
|
||||
|
||||
use anyhow::ensure;
|
||||
use std::io::{self};
|
||||
|
||||
use anyhow::{ensure, Context};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{de::Error, Deserialize, Serialize, Serializer};
|
||||
use thiserror::Error;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
|
||||
/// Use special format number to enable backward compatibility.
|
||||
const METADATA_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
@@ -259,6 +268,32 @@ impl Serialize for TimelineMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
#[tracing::instrument(skip_all, fields(%tenant_id=tenant_shard_id.tenant_id, %shard_id=tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub async fn save_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
data: &TimelineMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
let path = conf.metadata_path(tenant_shard_id, timeline_id);
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
|
||||
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
|
||||
.await
|
||||
.context("write metadata")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LoadMetadataError {
|
||||
#[error(transparent)]
|
||||
Read(#[from] io::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
Decode(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -42,7 +42,7 @@ use crate::tenant::config::{
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
|
||||
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext::PathExt;
|
||||
@@ -359,6 +359,12 @@ fn load_tenant_config(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let tenant_shard_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
@@ -371,59 +377,6 @@ fn load_tenant_config(
|
||||
}
|
||||
};
|
||||
|
||||
// Clean up legacy `metadata` files.
|
||||
// Doing it here because every single tenant directory is visited here.
|
||||
// In any later code, there's different treatment of tenant dirs
|
||||
// ... depending on whether the tenant is in re-attach response or not
|
||||
// ... epending on whether the tenant is ignored or not
|
||||
assert_eq!(
|
||||
&conf.tenant_path(&tenant_shard_id),
|
||||
&tenant_dir_path,
|
||||
"later use of conf....path() methods would be dubious"
|
||||
);
|
||||
let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
|
||||
Ok(iter) => {
|
||||
let mut timelines = Vec::new();
|
||||
for res in iter {
|
||||
let p = res?;
|
||||
let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
|
||||
// skip any entries that aren't TimelineId, such as
|
||||
// - *.___temp dirs
|
||||
// - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
|
||||
continue;
|
||||
};
|
||||
timelines.push(timeline_id);
|
||||
}
|
||||
timelines
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
|
||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
||||
};
|
||||
for timeline_id in timelines {
|
||||
let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
let metadata_path = timeline_path.join(METADATA_FILE_NAME);
|
||||
match std::fs::remove_file(&metadata_path) {
|
||||
Ok(()) => {
|
||||
crashsafe::fsync(timeline_path)
|
||||
.context("fsync timeline dir after removing legacy metadata file")?;
|
||||
info!("removed legacy metadata file at {metadata_path}");
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// something removed the file earlier, or it was never there
|
||||
// We don't care, this software version doesn't write it again, so, we're good.
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some((
|
||||
tenant_shard_id,
|
||||
Tenant::load_tenant_config(conf, &tenant_shard_id),
|
||||
|
||||
@@ -45,7 +45,7 @@ use rand::Rng;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use utils::{
|
||||
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
|
||||
};
|
||||
@@ -491,9 +491,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
|
||||
let heatmap_path_bg = heatmap_path.clone();
|
||||
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
|
||||
.await
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.expect("Blocking task is never aborted")
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
|
||||
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
|
||||
|
||||
@@ -786,7 +791,6 @@ async fn init_timeline_state(
|
||||
let file_name = file_path.file_name().expect("created it from the dentry");
|
||||
if file_name == METADATA_FILE_NAME {
|
||||
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
|
||||
warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
|
||||
continue;
|
||||
} else if crate::is_temporary(&file_path) {
|
||||
// Temporary files are frequently left behind from restarting during downloads
|
||||
|
||||
@@ -54,7 +54,7 @@ use crate::pgdatadir_mapping::DirectoryKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{
|
||||
@@ -76,7 +76,7 @@ use crate::{
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
use crate::metrics::{
|
||||
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
};
|
||||
@@ -210,6 +210,17 @@ pub struct Timeline {
|
||||
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
/// It is used by compaction task when it checks if new image layer should be created.
|
||||
/// Newly created image layer doesn't help to remove the delta layer, until the
|
||||
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
|
||||
/// gc_timeline may still want the new image layer to be created. To avoid redundant
|
||||
/// image layers creation we should check if image layer exists but beyond PITR horizon.
|
||||
/// This is why we need remember GC cutoff LSN.
|
||||
///
|
||||
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
@@ -292,7 +303,7 @@ pub struct Timeline {
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>,
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -334,7 +345,7 @@ pub struct Timeline {
|
||||
///
|
||||
/// Must only be taken in two places:
|
||||
/// - [`Timeline::compact`] (this file)
|
||||
/// - [`delete::delete_local_timeline_directory`]
|
||||
/// - [`delete::delete_local_layer_files`]
|
||||
///
|
||||
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
|
||||
compaction_lock: tokio::sync::Mutex<()>,
|
||||
@@ -343,7 +354,7 @@ pub struct Timeline {
|
||||
///
|
||||
/// Must only be taken in two places:
|
||||
/// - [`Timeline::gc`] (this file)
|
||||
/// - [`delete::delete_local_timeline_directory`]
|
||||
/// - [`delete::delete_local_layer_files`]
|
||||
///
|
||||
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
|
||||
gc_lock: tokio::sync::Mutex<()>,
|
||||
@@ -1505,6 +1516,13 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
fn get_gc_feedback(&self) -> bool {
|
||||
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_feedback
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -1578,6 +1596,7 @@ impl Timeline {
|
||||
shard_identity,
|
||||
pg_version,
|
||||
layers: Default::default(),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver: Mutex::new(None),
|
||||
@@ -1640,7 +1659,7 @@ impl Timeline {
|
||||
// initial logical size is 0.
|
||||
LogicalSize::empty_initial()
|
||||
},
|
||||
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
@@ -1826,11 +1845,7 @@ impl Timeline {
|
||||
discovered_layers.push((file_name, file_size));
|
||||
continue;
|
||||
}
|
||||
Discovered::Metadata => {
|
||||
warn!("found legacy metadata file, these should have been removed in load_tenant_config");
|
||||
continue;
|
||||
}
|
||||
Discovered::IgnoredBackup => {
|
||||
Discovered::Metadata | Discovered::IgnoredBackup => {
|
||||
continue;
|
||||
}
|
||||
Discovered::Unknown(file_name) => {
|
||||
@@ -2337,7 +2352,7 @@ impl Timeline {
|
||||
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
|
||||
if !self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id)
|
||||
.metadata_path(&self.tenant_shard_id, &self.timeline_id)
|
||||
.exists()
|
||||
{
|
||||
error!("timeline-calculate-logical-size-pre metadata file does not exist")
|
||||
@@ -3192,7 +3207,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now. The flushed layer is stored in
|
||||
// the mapping in `create_delta_layer`.
|
||||
{
|
||||
let metadata = {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -3206,7 +3221,9 @@ impl Timeline {
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
|
||||
Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// release lock on 'layers'
|
||||
};
|
||||
@@ -3221,6 +3238,22 @@ impl Timeline {
|
||||
// This failpoint is used by another test case `test_pageserver_recovery`.
|
||||
fail_point!("flush-frozen-exit");
|
||||
|
||||
// Update the metadata file, with new 'disk_consistent_lsn'
|
||||
//
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
|
||||
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
|
||||
if let Some(metadata) = metadata {
|
||||
save_metadata(
|
||||
self.conf,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
&metadata,
|
||||
)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3276,6 +3309,25 @@ impl Timeline {
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
async fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
|
||||
|
||||
save_metadata(
|
||||
self.conf,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
&metadata,
|
||||
)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client
|
||||
@@ -3354,34 +3406,30 @@ impl Timeline {
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
anyhow::bail!("repartition() called concurrently, this should not happen");
|
||||
};
|
||||
if lsn < partitioning_guard.1 {
|
||||
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
}
|
||||
}
|
||||
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if lsn > partitioning_guard.1 {
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
} else {
|
||||
warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
|
||||
}
|
||||
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
|
||||
}
|
||||
|
||||
@@ -3393,6 +3441,31 @@ impl Timeline {
|
||||
let layers = guard.layer_map();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
|
||||
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
|
||||
let img_range =
|
||||
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
|
||||
if wanted.overlaps(&img_range) {
|
||||
//
|
||||
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
|
||||
// but create_image_layers creates image layers at last-record-lsn.
|
||||
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
|
||||
// but the range is already covered by image layers at more recent LSNs. Before we
|
||||
// create a new image layer, check if the range is already covered at more recent LSNs.
|
||||
if !layers
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
|
||||
{
|
||||
debug!(
|
||||
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
|
||||
img_range.start, img_range.end, cutoff_lsn, lsn
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn);
|
||||
for (img_range, last_img) in image_coverage {
|
||||
@@ -3563,6 +3636,12 @@ impl Timeline {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
}
|
||||
}
|
||||
// All layers that the GC wanted us to create have now been created.
|
||||
//
|
||||
// It's possible that another GC cycle happened while we were compacting, and added
|
||||
// something new to wanted_image_layers, and we now clear that before processing it.
|
||||
// That's OK, because the next GC iteration will put it back in.
|
||||
*self.wanted_image_layers.lock().unwrap() = None;
|
||||
|
||||
// Sync the new layer to disk before adding it to the layer map, to make sure
|
||||
// we don't garbage collect something based on the new layer, before it has
|
||||
@@ -4472,6 +4551,7 @@ impl Timeline {
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let mut wanted_image_layers = KeySpaceRandomAccum::default();
|
||||
|
||||
// Scan all layers in the timeline (remote or on-disk).
|
||||
//
|
||||
@@ -4553,6 +4633,15 @@ impl Timeline {
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
debug!("keeping {} because it is the latest layer", l.filename());
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
// collecting the layers.
|
||||
// It is not so obvious whether we need to propagate information only about
|
||||
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
|
||||
// But image layers are in any case less sparse than delta layers. Also we need some
|
||||
// protection from replacing recent image layers with new one after each GC iteration.
|
||||
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
|
||||
wanted_image_layers.add_range(l.get_key_range());
|
||||
}
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -4565,13 +4654,24 @@ impl Timeline {
|
||||
);
|
||||
layers_to_remove.push(l);
|
||||
}
|
||||
self.wanted_image_layers
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value before we actually remove anything.
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
//
|
||||
// This does not in fact have any effect as we no longer consider local metadata unless
|
||||
// running without remote storage.
|
||||
//
|
||||
// This unconditionally schedules also an index_part.json update, even though, we will
|
||||
// be doing one a bit later with the unlinked gc'd layers.
|
||||
let disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
self.schedule_uploads(disk_consistent_lsn, None)?;
|
||||
//
|
||||
// TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), None)
|
||||
.await?;
|
||||
|
||||
let gc_layers = layers_to_remove
|
||||
.iter()
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::{
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{debug, error, info, instrument, Instrument};
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
@@ -124,7 +124,7 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
|
||||
/// No timeout here, GC & Compaction should be responsive to the
|
||||
/// `TimelineState::Stopping` change.
|
||||
// pub(super): documentation link
|
||||
pub(super) async fn delete_local_timeline_directory(
|
||||
pub(super) async fn delete_local_layer_files(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Timeline,
|
||||
@@ -149,6 +149,8 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// For configurations without remote storage, we guarantee crash-safety by persising delete mark file.
|
||||
//
|
||||
// Note that here we do not bail out on std::io::ErrorKind::NotFound.
|
||||
// This can happen if we're called a second time, e.g.,
|
||||
// because of a previous failure/cancellation at/after
|
||||
@@ -156,21 +158,72 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
//
|
||||
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
|
||||
// no locks are shared.
|
||||
tokio::fs::remove_dir_all(local_timeline_directory)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.context("remove local timeline directory")?;
|
||||
//
|
||||
// For now, log and continue.
|
||||
// warn! level is technically not appropriate for the
|
||||
// first case because we should expect retries to happen.
|
||||
// But the error is so rare, it seems better to get attention if it happens.
|
||||
//
|
||||
// Note that metadata removal is skipped, this is not technically needed,
|
||||
// but allows to reuse timeline loading code during resumed deletion.
|
||||
// (we always expect that metadata is in place when timeline is being loaded)
|
||||
|
||||
// Make sure previous deletions are ordered before mark removal.
|
||||
// Otherwise there is no guarantee that they reach the disk before mark deletion.
|
||||
// So its possible for mark to reach disk first and for other deletions
|
||||
// to be reordered later and thus missed if a crash occurs.
|
||||
// Note that we dont need to sync after mark file is removed
|
||||
// because we can tolerate the case when mark file reappears on startup.
|
||||
let timeline_path = conf.timelines_path(&tenant_shard_id);
|
||||
crashsafe::fsync_async(timeline_path)
|
||||
.await
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
#[cfg(feature = "testing")]
|
||||
let mut counter = 0;
|
||||
|
||||
// Timeline directory may not exist if we failed to delete mark file and request was retried.
|
||||
if !local_timeline_directory.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let metadata_path = conf.metadata_path(&tenant_shard_id, &timeline.timeline_id);
|
||||
|
||||
for entry in walkdir::WalkDir::new(&local_timeline_directory).contents_first(true) {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
counter += 1;
|
||||
if counter == 2 {
|
||||
fail::fail_point!("timeline-delete-during-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-during-rm"))?
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let entry = entry?;
|
||||
if entry.path() == metadata_path {
|
||||
debug!("found metadata, skipping");
|
||||
continue;
|
||||
}
|
||||
|
||||
if entry.path() == local_timeline_directory {
|
||||
// Keeping directory because metedata file is still there
|
||||
debug!("found timeline dir itself, skipping");
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = match entry.metadata() {
|
||||
Ok(metadata) => metadata,
|
||||
Err(e) => {
|
||||
if crate::is_walkdir_io_not_found(&e) {
|
||||
warn!(
|
||||
timeline_dir=?local_timeline_directory,
|
||||
path=?entry.path().display(),
|
||||
"got not found err while removing timeline dir, proceeding anyway"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
anyhow::bail!(e);
|
||||
}
|
||||
};
|
||||
|
||||
if metadata.is_dir() {
|
||||
warn!(path=%entry.path().display(), "unexpected directory under timeline dir");
|
||||
tokio::fs::remove_dir(entry.path()).await
|
||||
} else {
|
||||
tokio::fs::remove_file(entry.path()).await
|
||||
}
|
||||
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
drop(guards);
|
||||
@@ -201,6 +254,39 @@ async fn cleanup_remaining_timeline_fs_traces(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove local metadata
|
||||
tokio::fs::remove_file(conf.metadata_path(&tenant_shard_id, &timeline_id))
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.context("remove metadata")?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm-metadata", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-after-rm-metadata"
|
||||
))?
|
||||
});
|
||||
|
||||
// Remove timeline dir
|
||||
tokio::fs::remove_dir(conf.timeline_path(&tenant_shard_id, &timeline_id))
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.context("timeline dir")?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm-dir", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm-dir"))?
|
||||
});
|
||||
|
||||
// Make sure previous deletions are ordered before mark removal.
|
||||
// Otherwise there is no guarantee that they reach the disk before mark deletion.
|
||||
// So its possible for mark to reach disk first and for other deletions
|
||||
// to be reordered later and thus missed if a crash occurs.
|
||||
// Note that we dont need to sync after mark file is removed
|
||||
// because we can tolerate the case when mark file reappears on startup.
|
||||
let timeline_path = conf.timelines_path(&tenant_shard_id);
|
||||
crashsafe::fsync_async(timeline_path)
|
||||
.await
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
|
||||
// Remove delete mark
|
||||
// TODO: once we are confident that no more exist in the field, remove this
|
||||
// line. It cleans up a legacy marker file that might in rare cases be present.
|
||||
@@ -466,12 +552,15 @@ impl DeleteTimelineFlow {
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
delete_local_layer_files(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
|
||||
delete_remote_layers_and_index(timeline).await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
cleanup_remaining_timeline_fs_traces(conf, tenant.tenant_shard_id, timeline.timeline_id)
|
||||
.await?;
|
||||
|
||||
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
@@ -130,7 +130,7 @@ pub(super) struct UploadQueueStopped {
|
||||
pub(crate) enum NotInitialized {
|
||||
#[error("queue is in state Uninitialized")]
|
||||
Uninitialized,
|
||||
#[error("queue is in state Stopped")]
|
||||
#[error("queue is in state Stopping")]
|
||||
Stopped,
|
||||
#[error("queue is shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
@@ -19,13 +19,14 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
pub(crate) mod io_engine;
|
||||
@@ -403,34 +404,47 @@ impl VirtualFile {
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Async version of [`::utils::crashsafe::overwrite`].
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion
|
||||
///
|
||||
/// # NB:
|
||||
///
|
||||
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
|
||||
/// it did at an earlier time.
|
||||
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
|
||||
final_path: Utf8PathBuf,
|
||||
tmp_path: Utf8PathBuf,
|
||||
/// The file is first written to the specified tmp_path, and in a second
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf>(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: B,
|
||||
) -> std::io::Result<()> {
|
||||
// TODO: use tokio_epoll_uring if configured as `io_engine`.
|
||||
// See https://github.com/neondatabase/neon/issues/6663
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let slice_storage;
|
||||
let content_len = content.bytes_init();
|
||||
let content = if content.bytes_init() > 0 {
|
||||
slice_storage = Some(content.slice(0..content_len));
|
||||
slice_storage.as_deref().expect("just set it to Some()")
|
||||
} else {
|
||||
&[]
|
||||
};
|
||||
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
|
||||
})
|
||||
.await
|
||||
.expect("blocking task is never aborted")
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
|
||||
let mut file = Self::open_with_options(
|
||||
tmp_path,
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let (_content, res) = file.write_all(content).await;
|
||||
res?;
|
||||
file.sync_all().await?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
// Only open final path parent dirfd now, so that this operation only
|
||||
// ever holds one VirtualFile fd at a time. That's important because
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
// VirtualFile., and it eventually does a blocking write lock instead of
|
||||
// try_lock.
|
||||
let final_parent_dirfd =
|
||||
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
|
||||
final_parent_dirfd.sync_all().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
@@ -1323,7 +1337,7 @@ mod tests {
|
||||
let path = testdir.join("myfile");
|
||||
let tmp_path = testdir.join("myfile.tmp");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1332,7 +1346,7 @@ mod tests {
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1354,7 +1368,7 @@ mod tests {
|
||||
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
|
||||
assert!(tmp_path.exists());
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -334,12 +334,6 @@ impl WalIngest {
|
||||
{
|
||||
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
@@ -366,13 +360,6 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
}
|
||||
}
|
||||
_x => {
|
||||
// TODO: should probably log & fail here instead of blindly
|
||||
// doing something without understanding the protocol
|
||||
|
||||
@@ -773,42 +773,6 @@ impl XlLogicalMessage {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlRunningXacts {
|
||||
pub xcnt: u32,
|
||||
pub subxcnt: u32,
|
||||
pub subxid_overflow: bool,
|
||||
pub next_xid: TransactionId,
|
||||
pub oldest_running_xid: TransactionId,
|
||||
pub latest_completed_xid: TransactionId,
|
||||
pub xids: Vec<TransactionId>,
|
||||
}
|
||||
|
||||
impl XlRunningXacts {
|
||||
pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
|
||||
let xcnt = buf.get_u32_le();
|
||||
let subxcnt = buf.get_u32_le();
|
||||
let subxid_overflow = buf.get_u32_le() != 0;
|
||||
let next_xid = buf.get_u32_le();
|
||||
let oldest_running_xid = buf.get_u32_le();
|
||||
let latest_completed_xid = buf.get_u32_le();
|
||||
let mut xids = Vec::new();
|
||||
for _ in 0..(xcnt + subxcnt) {
|
||||
xids.push(buf.get_u32_le());
|
||||
}
|
||||
XlRunningXacts {
|
||||
xcnt,
|
||||
subxcnt,
|
||||
subxid_overflow,
|
||||
next_xid,
|
||||
oldest_running_xid,
|
||||
latest_completed_xid,
|
||||
xids,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main routine to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
// See xlogrecord.h for details
|
||||
|
||||
@@ -91,6 +91,7 @@ impl PostgresRedoManager {
|
||||
if rec_neon != batch_neon {
|
||||
let result = if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
|
||||
.await
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -111,6 +112,7 @@ impl PostgresRedoManager {
|
||||
// last batch
|
||||
if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
|
||||
.await
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -314,7 +316,7 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Process a batch of WAL records using bespoken Neon code.
|
||||
///
|
||||
fn apply_batch_neon(
|
||||
async fn apply_batch_neon(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -332,9 +334,17 @@ impl PostgresRedoManager {
|
||||
anyhow::bail!("invalid neon WAL redo request with no base image");
|
||||
}
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
for (record_lsn, record) in records.iter() {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
// process the records in batches and yield; this should guard against pathological
|
||||
// situations where we accidentially have a huge number of in-neon applied records.
|
||||
let yield_every = 200;
|
||||
|
||||
for records in records.chunks(yield_every) {
|
||||
// Apply all the WAL records in the batch
|
||||
for (record_lsn, record) in records {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
}
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
// Success!
|
||||
let duration = start_time.elapsed();
|
||||
|
||||
@@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.2'" to load this file. \quit
|
||||
|
||||
-- Create a convenient view similar to pg_stat_database
|
||||
-- that exposes all lfc stat values in one row.
|
||||
CREATE OR REPLACE VIEW NEON_STAT_FILE_CACHE AS
|
||||
WITH lfc_stats AS (
|
||||
SELECT
|
||||
stat_name,
|
||||
count
|
||||
FROM neon_get_lfc_stats() AS t(stat_name text, count bigint)
|
||||
),
|
||||
lfc_values AS (
|
||||
SELECT
|
||||
MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE NULL END) AS file_cache_misses,
|
||||
MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE NULL END) AS file_cache_hits,
|
||||
MAX(CASE WHEN stat_name = 'file_cache_used' THEN count ELSE NULL END) AS file_cache_used,
|
||||
MAX(CASE WHEN stat_name = 'file_cache_writes' THEN count ELSE NULL END) AS file_cache_writes,
|
||||
-- Calculate the file_cache_hit_ratio within the same CTE for simplicity
|
||||
CASE
|
||||
WHEN MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE 0 END) + MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END) = 0 THEN NULL
|
||||
ELSE ROUND((MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END)::DECIMAL /
|
||||
(MAX(CASE WHEN stat_name = 'file_cache_hits' THEN count ELSE 0 END) + MAX(CASE WHEN stat_name = 'file_cache_misses' THEN count ELSE 0 END))) * 100, 2)
|
||||
END AS file_cache_hit_ratio
|
||||
FROM lfc_stats
|
||||
)
|
||||
SELECT file_cache_misses, file_cache_hits, file_cache_used, file_cache_writes, file_cache_hit_ratio from lfc_values;
|
||||
|
||||
-- externalize the view to all users in role pg_monitor
|
||||
GRANT SELECT ON NEON_STAT_FILE_CACHE TO PG_MONITOR;
|
||||
237
pgxn/neon/neon.c
237
pgxn/neon/neon.c
@@ -37,8 +37,7 @@
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
bool primary_is_running = false;
|
||||
static int logical_replication_max_time_lag = 3600;
|
||||
|
||||
static void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
@@ -46,14 +45,14 @@ InitLogicalReplicationMonitor(void)
|
||||
BackgroundWorker bgw;
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_snap_files",
|
||||
"Maximum allowed logical replication .snap files",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
"neon.logical_replication_max_time_lag",
|
||||
"Threshold for dropping unused logical replication slots",
|
||||
NULL,
|
||||
&logical_replication_max_time_lag,
|
||||
3600, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_S,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
@@ -69,99 +68,22 @@ InitLogicalReplicationMonitor(void)
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
static int
|
||||
LsnDescComparator(const void *a, const void *b)
|
||||
typedef struct
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return 1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
|
||||
* next gc would leave not more than logical_replication_max_snap_files; all
|
||||
* slots having lower restart_lsn should be dropped.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
get_num_snap_files_lsn_threshold(void)
|
||||
{
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int cnt = 0;
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
XLogRecPtr cutoff;
|
||||
|
||||
if (logical_replication_max_snap_files < 0)
|
||||
return 0;
|
||||
|
||||
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
|
||||
|
||||
/* find all .snap files and get their lsns */
|
||||
dirdesc = AllocateDir(snap_path);
|
||||
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
uint32 hi;
|
||||
uint32 lo;
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 ||
|
||||
strcmp(de->d_name, "..") == 0)
|
||||
continue;
|
||||
|
||||
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
|
||||
continue;
|
||||
}
|
||||
|
||||
lsn = ((uint64) hi) << 32 | lo;
|
||||
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
|
||||
if (lsns_allocated == lsns_num)
|
||||
{
|
||||
lsns_allocated *= 2;
|
||||
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
|
||||
}
|
||||
lsns[lsns_num++] = lsn;
|
||||
}
|
||||
/* sort by lsn desc */
|
||||
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
|
||||
/* and take cutoff at logical_replication_max_snap_files */
|
||||
if (logical_replication_max_snap_files > lsns_num)
|
||||
cutoff = 0;
|
||||
/* have less files than cutoff */
|
||||
else
|
||||
{
|
||||
cutoff = lsns[logical_replication_max_snap_files - 1];
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
|
||||
}
|
||||
pfree(lsns);
|
||||
FreeDir(dirdesc);
|
||||
return cutoff;
|
||||
}
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
NameData name;
|
||||
bool dropped;
|
||||
XLogRecPtr confirmed_flush_lsn;
|
||||
TimestampTz last_updated;
|
||||
} SlotStatus;
|
||||
|
||||
/*
|
||||
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
|
||||
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
|
||||
* need too many .snap files.
|
||||
*/
|
||||
PGDLLEXPORT void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
TimestampTz now,
|
||||
last_checked;
|
||||
SlotStatus* slots;
|
||||
TimestampTz now, last_checked;
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
@@ -170,105 +92,75 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus));
|
||||
last_checked = GetCurrentTimestamp();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr cutoff_lsn;
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
logical_replication_max_time_lag*1000/2,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* If there are too many .snap files, just drop all logical slots to
|
||||
* prevent aux files bloat.
|
||||
*/
|
||||
cutoff_lsn = get_num_snap_files_lsn_threshold();
|
||||
if (cutoff_lsn > 0)
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
{
|
||||
int n_active_slots = 0;
|
||||
last_checked = now;
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
char slot_name[NAMEDATALEN];
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
/* find the name */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
/* Consider only logical repliction slots */
|
||||
if (!s->in_use || !SlotIsLogical(s))
|
||||
continue;
|
||||
|
||||
if (s->active_pid != 0)
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
n_active_slots += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* do we need to drop it? */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
SpinLockRelease(&s->mutex);
|
||||
if (restart_lsn >= cutoff_lsn)
|
||||
/* Check if there was some activity with the slot since last check */
|
||||
if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn)
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
slots[i].confirmed_flush_lsn = s->data.confirmed_flush;
|
||||
slots[i].last_updated = now;
|
||||
}
|
||||
|
||||
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
|
||||
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
|
||||
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/* now try to drop it, killing owner before if any */
|
||||
for (;;)
|
||||
else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
{
|
||||
pid_t active_pid;
|
||||
slots[i].name = s->data.name;
|
||||
slots[i].dropped = true;
|
||||
}
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
active_pid = s->active_pid;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
if (active_pid == 0)
|
||||
/*
|
||||
* If there are no active subscriptions, then no new snapshots are generated
|
||||
* and so no need to force slot deletion.
|
||||
*/
|
||||
if (n_active_slots != 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
if (slots[i].dropped)
|
||||
{
|
||||
/*
|
||||
* Slot is releasted, try to drop it. Though of course
|
||||
* it could have been reacquired, so drop can ERROR
|
||||
* out. Similarly it could have been dropped in the
|
||||
* meanwhile.
|
||||
*
|
||||
* In principle we could remove pg_try/pg_catch, that
|
||||
* would restart the whole bgworker.
|
||||
*/
|
||||
ConditionVariableCancelSleep();
|
||||
PG_TRY();
|
||||
{
|
||||
ReplicationSlotDrop(slot_name, true);
|
||||
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* log ERROR and reset elog stack */
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
|
||||
}
|
||||
PG_END_TRY();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* kill the owner and wait for release */
|
||||
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
/* We shouldn't get stuck, but to be safe add timeout. */
|
||||
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
|
||||
elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds",
|
||||
(now - slots[i].last_updated)/USECS_PER_SEC);
|
||||
ReplicationSlotDrop(slots[i].name.data, true);
|
||||
slots[i].dropped = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
LS_MONITOR_CHECK_INTERVAL,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -289,15 +181,6 @@ _PG_init(void)
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.primary_is_running",
|
||||
"true if the primary was running at replica startup. false otherwise",
|
||||
NULL,
|
||||
&primary_is_running,
|
||||
false,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.2'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -155,23 +155,12 @@ class NeonCompare(PgCompare):
|
||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
metric_filters = {
|
||||
"tenant_id": str(self.tenant),
|
||||
"timeline_id": str(self.timeline),
|
||||
"file_kind": "layer",
|
||||
"op_kind": "upload",
|
||||
}
|
||||
# use `started` (not `finished`) counters here, because some callers
|
||||
# don't wait for upload queue to drain
|
||||
metric_filters = {"tenant_id": str(self.tenant), "timeline_id": str(self.timeline)}
|
||||
total_files = self.zenbenchmark.get_int_counter_value(
|
||||
self.env.pageserver,
|
||||
"pageserver_remote_timeline_client_calls_started_total",
|
||||
metric_filters,
|
||||
self.env.pageserver, "pageserver_created_persistent_files_total", metric_filters
|
||||
)
|
||||
total_bytes = self.zenbenchmark.get_int_counter_value(
|
||||
self.env.pageserver,
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
metric_filters,
|
||||
self.env.pageserver, "pageserver_written_persistent_bytes_total", metric_filters
|
||||
)
|
||||
self.zenbenchmark.record(
|
||||
"data_uploaded", total_bytes / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||
|
||||
@@ -147,6 +147,8 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
"pageserver_storage_operations_seconds_count_total",
|
||||
"pageserver_storage_operations_seconds_sum_total",
|
||||
"pageserver_created_persistent_files_total",
|
||||
"pageserver_written_persistent_bytes_total",
|
||||
"pageserver_evictions_total",
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
|
||||
@@ -1104,13 +1104,6 @@ class NeonEnv:
|
||||
# bounce through retries on startup
|
||||
self.attachment_service.start()
|
||||
|
||||
def attachment_service_ready():
|
||||
assert self.attachment_service.ready() is True
|
||||
|
||||
# Wait for attachment service readiness to prevent unnecessary post start-up
|
||||
# reconcile.
|
||||
wait_until(30, 1, attachment_service_ready)
|
||||
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
@@ -3111,8 +3104,6 @@ class Endpoint(PgProtocol):
|
||||
# set small 'max_replication_write_lag' to enable backpressure
|
||||
# and make tests more stable.
|
||||
config_lines = ["max_replication_write_lag=15MB"] + config_lines
|
||||
|
||||
config_lines = ["neon.primary_is_running=on"] + config_lines
|
||||
self.config(config_lines)
|
||||
|
||||
return self
|
||||
@@ -3819,7 +3810,7 @@ def pytest_addoption(parser: Parser):
|
||||
|
||||
|
||||
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
)
|
||||
|
||||
|
||||
@@ -4156,21 +4147,6 @@ def tenant_get_shards(
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = Lsn(
|
||||
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def wait_for_last_flush_lsn(
|
||||
env: NeonEnv,
|
||||
endpoint: Endpoint,
|
||||
|
||||
@@ -13,11 +13,6 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
Information about image layers needed to collect old layers should
|
||||
be propagated by GC to compaction task which should take in in account
|
||||
when make a decision which new image layers needs to be created.
|
||||
|
||||
NB: this test demonstrates the problem. The source tree contained the
|
||||
`gc_feedback` mechanism for about 9 months, but, there were problems
|
||||
with it and it wasn't enabled at runtime.
|
||||
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -166,6 +166,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"threshold": "23h",
|
||||
},
|
||||
"evictions_low_residence_duration_metric_threshold": "2days",
|
||||
"gc_feedback": True,
|
||||
"gc_horizon": 23 * (1024 * 1024),
|
||||
"gc_period": "2h 13m",
|
||||
"heatmap_period": "10m",
|
||||
|
||||
@@ -3,7 +3,22 @@ import re
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -64,7 +79,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
primary.safe_psql("create table t(key int, value text)")
|
||||
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
wait_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import time
|
||||
from functools import partial
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
|
||||
@@ -11,7 +10,7 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def random_string(n: int):
|
||||
@@ -158,51 +157,6 @@ COMMIT;
|
||||
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
|
||||
|
||||
|
||||
# Test that neon.logical_replication_max_snap_files works
|
||||
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
def slot_removed(ep):
|
||||
assert (
|
||||
endpoint.safe_psql(
|
||||
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
|
||||
)[0][0]
|
||||
== 0
|
||||
)
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
# set low neon.logical_replication_max_snap_files
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication",
|
||||
config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"],
|
||||
)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# create obsolete slot
|
||||
cur.execute("select pg_create_logical_replication_slot('stale_slot', 'pgoutput');")
|
||||
assert (
|
||||
endpoint.safe_psql(
|
||||
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
|
||||
)[0][0]
|
||||
== 1
|
||||
)
|
||||
|
||||
# now insert some data and create and start live subscriber to create more .snap files
|
||||
# (in most cases this is not needed as stale_slot snap will have higher LSN than restart_lsn anyway)
|
||||
cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
cur.execute("create publication pub1 for table t")
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||
|
||||
|
||||
# Test compute start at LSN page of which starts with contrecord
|
||||
# https://github.com/neondatabase/neon/issues/5749
|
||||
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
|
||||
@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
|
||||
endpoint.wait_for_migrations()
|
||||
|
||||
num_migrations = 8
|
||||
num_migrations = 6
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT id FROM neon_migration.migration_id")
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@@ -23,9 +22,4 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
# IMPORTANT:
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.2",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) == 1
|
||||
assert len(res[0]) == 5
|
||||
assert cur.fetchone() == ("1.1",)
|
||||
|
||||
@@ -228,9 +228,9 @@ def test_remote_storage_upload_queue_retries(
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{64 * 1024}",
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{64 * 1024}",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
@@ -256,24 +256,21 @@ def test_remote_storage_upload_queue_retries(
|
||||
]
|
||||
)
|
||||
|
||||
FOO_ROWS_COUNT = 4000
|
||||
|
||||
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
|
||||
# create initial set of layers & upload them with failpoints configured
|
||||
for _v in range(2):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, {FOO_ROWS_COUNT}) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
@@ -336,7 +333,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
|
||||
# so, give it some time to wrap up.
|
||||
churn_while_failpoints_active_thread.join(60)
|
||||
churn_while_failpoints_active_thread.join(30)
|
||||
assert not churn_while_failpoints_active_thread.is_alive()
|
||||
assert churn_thread_result[0]
|
||||
|
||||
@@ -368,7 +365,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
log.info("restarting postgres to validate")
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
with endpoint.cursor() as cur:
|
||||
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == FOO_ROWS_COUNT
|
||||
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000
|
||||
|
||||
|
||||
def test_remote_timeline_client_calls_started_metric(
|
||||
@@ -697,8 +694,10 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
|
||||
# index upload is now hitting the failpoint, it should block the shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
timeline_dir = env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id)
|
||||
assert timeline_dir.is_dir()
|
||||
local_metadata = (
|
||||
env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
|
||||
)
|
||||
assert local_metadata.is_file()
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
@@ -299,7 +299,8 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# tenant is created with defaults, as in without config file
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
config_path = env.pageserver.tenant_dir(tenant_id) / "config-v1"
|
||||
config_path = env.pageserver.tenant_dir(tenant_id) / "config"
|
||||
assert config_path.exists(), "config file is always initially created"
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -130,6 +130,7 @@ FAILPOINTS = [
|
||||
"timeline-delete-before-index-deleted-at",
|
||||
"timeline-delete-before-rm",
|
||||
"timeline-delete-before-index-delete",
|
||||
"timeline-delete-after-rm-dir",
|
||||
]
|
||||
|
||||
FAILPOINTS_BEFORE_BACKGROUND = [
|
||||
|
||||
@@ -157,7 +157,10 @@ def switch_pg_to_new_pageserver(
|
||||
timeline_to_detach_local_path = origin_ps.timeline_dir(tenant_id, timeline_id)
|
||||
files_before_detach = os.listdir(timeline_to_detach_local_path)
|
||||
assert (
|
||||
len(files_before_detach) >= 1
|
||||
"metadata" in files_before_detach
|
||||
), f"Regular timeline {timeline_to_detach_local_path} should have the metadata file, but got: {files_before_detach}"
|
||||
assert (
|
||||
len(files_before_detach) >= 2
|
||||
), f"Regular timeline {timeline_to_detach_local_path} should have at least one layer file, but got {files_before_detach}"
|
||||
|
||||
return timeline_to_detach_local_path
|
||||
|
||||
@@ -136,9 +136,12 @@ DELETE_FAILPOINTS = [
|
||||
"timeline-delete-before-index-deleted-at",
|
||||
"timeline-delete-before-schedule",
|
||||
"timeline-delete-before-rm",
|
||||
"timeline-delete-during-rm",
|
||||
"timeline-delete-after-rm",
|
||||
"timeline-delete-before-index-delete",
|
||||
"timeline-delete-after-index-delete",
|
||||
"timeline-delete-after-rm-metadata",
|
||||
"timeline-delete-after-rm-dir",
|
||||
]
|
||||
|
||||
|
||||
@@ -798,7 +801,7 @@ def test_timeline_delete_resumed_on_attach(
|
||||
)
|
||||
|
||||
# failpoint before we remove index_part from s3
|
||||
failpoint = "timeline-delete-after-rm"
|
||||
failpoint = "timeline-delete-during-rm"
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cdba8ec5a...9dd9956c55
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 0ec04712d5...ca2def9993
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: cc98378b0f...9c37a49884
7
vendor/revisions.json
vendored
7
vendor/revisions.json
vendored
@@ -1,6 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "cc98378b0fa7413b78a197e3292a806865e4056a",
|
||||
"postgres-v15": "0ec04712d55539550278595e853c172f7aa5fe3e",
|
||||
"postgres-v14": "4cdba8ec5a3868cec4826bbb3f16c1d3d2ac2283"
|
||||
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
|
||||
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
|
||||
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ files:
|
||||
|
||||
- metric_name: lfc_used
|
||||
type: gauge
|
||||
help: 'LFC chunks used (chunk = 1MB)'
|
||||
help: 'lfc_used'
|
||||
key_labels:
|
||||
values: [lfc_used]
|
||||
query: |
|
||||
@@ -124,14 +124,6 @@ files:
|
||||
query: |
|
||||
select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes';
|
||||
|
||||
- metric_name: lfc_cache_size_limit
|
||||
type: gauge
|
||||
help: 'LFC cache size limit in bytes'
|
||||
key_labels:
|
||||
values: [lfc_cache_size_limit]
|
||||
query: |
|
||||
select pg_size_bytes(current_setting('neon.file_cache_size_limit')) as lfc_cache_size_limit;
|
||||
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user