Compare commits

..

7 Commits

Author SHA1 Message Date
Alex Chi
1863ae799d fix blocking read
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-21 10:17:42 -04:00
Alex Chi
20fe57d93b refactor: use immutable storage state in timeline
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 16:03:41 -04:00
Alex Chi
0fad5e21ce Merge remote-tracking branch 'origin/skyzh/layermap-imm' into skyzh/immutable-storage 2023-06-14 15:29:59 -04:00
Alex Chi
a2056666ae pgserver: move mapping logic to layer cache
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 15:07:38 -04:00
Alex Chi
a3909e03f8 pgserver: add immutable layer map manager
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 16:25:27 -04:00
Alex Chi
fc190a2a19 resolve merge conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:56:50 -04:00
Alex Chi
faee3152f3 refactor: use LayerDesc in LayerMap (part 2)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:54:59 -04:00
98 changed files with 1956 additions and 3075 deletions

View File

@@ -180,8 +180,7 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
@@ -322,6 +321,8 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
@@ -413,6 +414,8 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
@@ -498,6 +501,8 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3

View File

@@ -623,6 +623,51 @@ jobs:
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
neon-image-depot:
# For testing this will run side-by-side for a few merges.
# This action is not really optimized yet, but gets the job done
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
permissions:
contents: read
id-token: write
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: '1.19'
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Install Crane & ECR helper
run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Build and push
uses: depot/build-push-action@v1
with:
# if no depot.json file is at the root of your repo, you must specify the project id
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
@@ -659,7 +704,6 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
@@ -717,40 +761,10 @@ jobs:
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--cleanup
# Due to a kaniko bug, we can't use cache for extensions image, thus it takes about the same amount of time as compute-node image to build (~10 min)
# During the transition period we need to have extensions in both places (in S3 and in compute-node image),
# so we won't build extension twice, but extract them from compute-node.
#
# For now we use extensions image only for new custom extensitons
- name: Kaniko build extensions only
run: |
# Kaniko is suposed to clean up after itself if --cleanup flag is set, but it doesn't.
# Despite some fixes were made in https://github.com/GoogleContainerTools/kaniko/pull/2504 (in kaniko v1.11.0),
# it still fails with error:
# error building image: could not save file: copying file: symlink postgres /kaniko/1/usr/local/pgsql/bin/postmaster: file exists
#
# Ref https://github.com/GoogleContainerTools/kaniko/issues/1406
find /kaniko -maxdepth 1 -mindepth 1 -type d -regex "/kaniko/[0-9]*" -exec rm -rv {} \;
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true \
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
--context . \
--build-arg GIT_VERSION=${{ github.sha }} \
--build-arg PG_VERSION=${{ matrix.version }} \
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} \
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com \
--dockerfile Dockerfile.compute-node \
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
--destination neondatabase/extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
--cleanup \
--target postgres-extensions
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
@@ -767,7 +781,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.11.0
VM_BUILDER_VERSION: v0.8.0
steps:
- name: Checkout
@@ -869,10 +883,8 @@ jobs:
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:${{needs.tag.outputs.build-tag}} latest
- name: Push images to production ECR
if: |
@@ -883,10 +895,8 @@ jobs:
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:latest
- name: Configure Docker Hub login
run: |
@@ -908,89 +918,16 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/extensions-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/extensions-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
upload-postgres-extensions-to-s3:
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
runs-on: ${{ github.ref_name == 'release' && fromJSON('["self-hosted", "prod", "x64"]') || fromJSON('["self-hosted", "gen3", "small"]') }}
needs: [ tag, promote-images ]
strategy:
fail-fast: false
matrix:
version: [ v14, v15 ]
env:
# While on transition period we extract public extensions from compute-node image and custom extensions from extensions image.
# Later all the extensions will be moved to extensions image.
EXTENSIONS_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:latest
COMPUTE_NODE_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:latest
AWS_ACCESS_KEY_ID: ${{ github.ref_name == 'release' && secrets.AWS_ACCESS_KEY_PROD || secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ github.ref_name == 'release' && secrets.AWS_SECRET_KEY_PROD || secrets.AWS_SECRET_KEY_DEV }}
S3_BUCKETS: |
${{ github.ref_name == 'release' &&
'neon-prod-extensions-ap-southeast-1 neon-prod-extensions-eu-central-1 neon-prod-extensions-us-east-1 neon-prod-extensions-us-east-2 neon-prod-extensions-us-west-2' ||
'neon-dev-extensions-eu-central-1 neon-dev-extensions-eu-west-1 neon-dev-extensions-us-east-2' }}
steps:
- name: Pull postgres-extensions image
run: |
docker pull ${EXTENSIONS_IMAGE}
docker pull ${COMPUTE_NODE_IMAGE}
- name: Create postgres-extensions container
id: create-container
run: |
EID=$(docker create ${EXTENSIONS_IMAGE} true)
echo "EID=${EID}" >> $GITHUB_OUTPUT
CID=$(docker create ${COMPUTE_NODE_IMAGE} true)
echo "CID=${CID}" >> $GITHUB_OUTPUT
- name: Extract postgres-extensions from container
run: |
rm -rf ./extensions-to-upload ./custom-extensions # Just in case
# In compute image we have a bit different directory layout
mkdir -p extensions-to-upload/share
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/share/extension ./extensions-to-upload/share/extension
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/lib ./extensions-to-upload/lib
# Delete Neon extensitons (they always present on compute-node image)
rm -rf ./extensions-to-upload/share/extension/neon*
rm -rf ./extensions-to-upload/lib/neon*
docker cp ${{ steps.create-container.outputs.EID }}:/extensions ./custom-extensions
for EXT_NAME in $(ls ./custom-extensions); do
mkdir -p ./extensions-to-upload/${EXT_NAME}/share
mv ./custom-extensions/${EXT_NAME}/share/extension ./extensions-to-upload/${EXT_NAME}/share/extension
mv ./custom-extensions/${EXT_NAME}/lib ./extensions-to-upload/${EXT_NAME}/lib
done
- name: Upload postgres-extensions to S3
run: |
for BUCKET in $(echo ${S3_BUCKETS}); do
aws s3 cp --recursive --only-show-errors ./extensions-to-upload s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}
done
- name: Cleanup
if: ${{ always() && (steps.create-container.outputs.CID || steps.create-container.outputs.EID) }}
run: |
docker rm ${{ steps.create-container.outputs.CID }} || true
docker rm ${{ steps.create-container.outputs.EID }} || true
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
needs: [ upload-postgres-extensions-to-s3, promote-images, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership
@@ -1022,20 +959,6 @@ jobs:
exit 1
fi
- name: Create git tag
if: github.ref_name == 'release'
uses: actions/github-script@v6
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/${{ needs.tag.outputs.build-tag }}",
sha: context.sha,
})
promote-compatibility-data:
runs-on: [ self-hosted, gen3, small ]
container:

View File

@@ -3,7 +3,6 @@ name: Create Release Branch
on:
schedule:
- cron: '0 10 * * 2'
workflow_dispatch:
jobs:
create_release_branch:

31
Cargo.lock generated
View File

@@ -110,6 +110,12 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
version = "0.5.0"
@@ -2349,9 +2355,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "openssl"
version = "0.10.55"
version = "0.10.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
dependencies = [
"bitflags",
"cfg-if",
@@ -2381,9 +2387,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.90"
version = "0.9.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
dependencies = [
"cc",
"libc",
@@ -2542,6 +2548,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-stream",
"async-trait",
"byteorder",
@@ -2770,7 +2777,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2783,7 +2790,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"native-tls",
"tokio",
@@ -2794,7 +2801,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -2812,7 +2819,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4223,7 +4230,8 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.28.1"
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [
"autocfg",
"bytes",
@@ -4250,7 +4258,8 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
@@ -4270,7 +4279,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -32,6 +32,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
@@ -140,11 +141,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -180,15 +181,13 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,
# until async safekeepers patch is merged to the main.
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
tokio = { git = "https://github.com/problame/tokio.git", branch="problame/distinguish-core-and-worker-by-thread-name" }
################# Binary contents sections
[profile.release]

View File

@@ -2,7 +2,6 @@ ARG PG_VERSION
ARG REPOSITORY=neondatabase
ARG IMAGE=rust
ARG TAG=pinned
ARG BUILD_TAG
#########################################################################################
#
@@ -481,60 +480,6 @@ RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar.
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
#########################################################################################
#
# Layer "pg-uuidv7-pg-build"
# compile pg_uuidv7 extension
#
#########################################################################################
FROM build-deps AS pg-uuidv7-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xvzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control
#########################################################################################
#
# Layer "pg-roaringbitmap-pg-build"
# compile pg_roaringbitmap extension
#
#########################################################################################
FROM build-deps AS pg-roaringbitmap-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xvzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
#########################################################################################
#
# Layer "pg-anon-pg-build"
# compile anon extension
#
#########################################################################################
FROM build-deps AS pg-anon-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# Kaniko doesn't allow to do `${from#/usr/local/pgsql/}`, so we use `${from:17}` instead
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgresql_anonymizer-1.1.0.tar.gz -O pg_anon.tar.gz && \
echo "08b09d2ff9b962f96c60db7e6f8e79cf7253eb8772516998fc35ece08633d3ad pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xvzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sort > /before.txt && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control && \
find /usr/local/pgsql -type f | sort > /after.txt && \
/bin/bash -c 'for from in $(comm -13 /before.txt /after.txt); do to=/extensions/anon/${from:17} && mkdir -p $(dirname ${to}) && cp -a ${from} ${to}; done'
#########################################################################################
#
# Layer "rust extensions"
@@ -643,7 +588,6 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz -
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
# Public extensions
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /sfcgal/* /
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -669,8 +613,6 @@ COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -692,9 +634,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
@@ -719,22 +658,6 @@ RUN rm -r /usr/local/pgsql/include
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Extenstion only
#
#########################################################################################
FROM scratch AS postgres-extensions
# After the transition this layer will include all extensitons.
# As for now, it's only for new custom ones
#
# # Default extensions
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/share/extension /usr/local/pgsql/share/extension
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/lib /usr/local/pgsql/lib
# Custom extensions
COPY --from=pg-anon-pg-build /extensions/anon/lib/ /extensions/anon/lib
COPY --from=pg-anon-pg-build /extensions/anon/share/extension /extensions/anon/share/extension
#########################################################################################
#
# Final layer

View File

@@ -3,7 +3,6 @@
ARG REPOSITORY=neondatabase
ARG IMAGE=rust
ARG TAG=pinned
ARG BUILD_TAG
FROM $REPOSITORY/$IMAGE:$TAG AS rust-build
WORKDIR /home/nonroot
@@ -17,8 +16,6 @@ ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
COPY . .

View File

@@ -54,15 +54,9 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
const BUILD_TAG_DEFAULT: &str = "local";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
let http_port = *matches

View File

@@ -133,84 +133,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
}
}
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
/// that we give to customers
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let roles = spec
.cluster
.roles
.iter()
.map(|r| format!("'{}'", escape_literal(&r.name)))
.collect::<Vec<_>>();
let dbs = spec
.cluster
.databases
.iter()
.map(|db| format!("'{}'", escape_literal(&db.name)))
.collect::<Vec<_>>();
let roles_decl = if roles.is_empty() {
String::from("roles text[] := NULL;")
} else {
format!(
r#"
roles text[] := ARRAY(SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname IN ({}));"#,
roles.join(", ")
)
};
let database_decl = if dbs.is_empty() {
String::from("dbs text[] := NULL;")
} else {
format!(
r#"
dbs text[] := ARRAY(SELECT datname
FROM pg_catalog.pg_database
WHERE datname IN ({}));"#,
dbs.join(", ")
)
};
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
let query = format!(
r#"
DO $$
DECLARE
r text;
{}
{}
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
IF array_length(roles, 1) IS NOT NULL THEN
EXECUTE format('GRANT neon_superuser TO %s',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
FOREACH r IN ARRAY roles LOOP
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
END LOOP;
END IF;
IF array_length(dbs, 1) IS NOT NULL THEN
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
END IF;
END IF;
END
$$;"#,
roles_decl, database_decl,
);
info!("Neon superuser created:\n{}", &query);
client
.simple_query(&query)
.map_err(|e| anyhow::anyhow!(e).context(query))?;
Ok(())
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
@@ -235,7 +157,7 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
@@ -277,7 +199,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip_all)]
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
@@ -322,7 +244,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
@@ -380,7 +302,7 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip_all)]
#[instrument(skip(self))]
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
@@ -404,7 +326,7 @@ impl ComputeNode {
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
#[instrument(skip(self, compute_state))]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
@@ -425,8 +347,6 @@ impl ComputeNode {
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
// Disable forwarding so that users don't get a cloud_admin role
client.simple_query("SET neon.forward_ddl = false")?;
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
@@ -437,16 +357,14 @@ impl ComputeNode {
Ok(client) => client,
};
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
client.simple_query("SET neon.forward_ddl = false")?;
// Proceed with post-startup configuration. Note, that order of operations is important.
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
create_neon_superuser(spec, &mut client)?;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str())?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
@@ -458,7 +376,7 @@ impl ComputeNode {
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
@@ -466,7 +384,7 @@ impl ComputeNode {
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip_all)]
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
@@ -484,7 +402,7 @@ impl ComputeNode {
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str())?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
}
@@ -501,7 +419,7 @@ impl ComputeNode {
Ok(())
}
#[instrument(skip_all)]
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");

View File

@@ -8,7 +8,7 @@ use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip_all)]
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {

View File

@@ -17,7 +17,7 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal
pub fn escape_literal(s: &str) -> String {
fn escape_literal(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}
@@ -215,7 +215,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
#[instrument(skip_all, fields(pgdata = %pgdata.display()))]
#[instrument(skip(pg))]
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");

View File

@@ -269,13 +269,17 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
xact.execute(query.as_str(), &[])?;
}
RoleAction::Create => {
let mut query: String = format!(
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
name.pg_quote()
);
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
info!("role create query: '{}'", &query);
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"GRANT pg_read_all_data, pg_write_all_data TO {}",
name.pg_quote()
);
xact.execute(grant_query.as_str(), &[])?;
info!("role grant query: '{}'", &grant_query);
}
}
@@ -472,11 +476,6 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
query.push_str(&db.to_pg_options());
let _guard = info_span!("executing", query).entered();
client.execute(query.as_str(), &[])?;
let grant_query: String = format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
name.pg_quote()
);
client.execute(grant_query.as_str(), &[])?;
}
};
@@ -496,9 +495,35 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
// via the web interface and proxy link auth. And also we grant a
// read / write all data privilege to every role. So also grant
// create to everyone.
// XXX: later we should stop messing with Postgres ACL in such horrible
// ways.
let roles = spec
.cluster
.roles
.iter()
.map(|r| r.name.pg_quote())
.collect::<Vec<_>>();
for db in &spec.cluster.databases {
let dbname = &db.name;
let query: String = format!(
"GRANT CREATE ON DATABASE {} TO {}",
dbname.pg_quote(),
roles.join(", ")
);
info!("grant query {}", &query);
client.execute(query.as_str(), &[])?;
}
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.

View File

@@ -308,8 +308,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
let force = init_match.get_flag("force");
env.init(pg_version, force)
env.init(pg_version)
.context("Failed to initialize neon repository")?;
// Initialize pageserver, create initial tenant and timeline.
@@ -1014,13 +1013,6 @@ fn cli() -> Command {
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
let force_arg = Arg::new("force")
.value_parser(value_parser!(bool))
.long("force")
.action(ArgAction::SetTrue)
.help("Force initialization even if the repository is not empty")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1036,7 +1028,6 @@ fn cli() -> Command {
.value_name("config"),
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
Command::new("timeline")

View File

@@ -67,7 +67,6 @@ pub struct EndpointConf {
pg_port: u16,
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
}
//
@@ -136,7 +135,6 @@ impl ComputeControlPlane {
mode,
tenant_id,
pg_version,
skip_pg_catalog_updates: false,
});
ep.create_endpoint_dir()?;
@@ -150,7 +148,6 @@ impl ComputeControlPlane {
http_port,
pg_port,
pg_version,
skip_pg_catalog_updates: false,
})?,
)?;
std::fs::write(
@@ -186,9 +183,6 @@ pub struct Endpoint {
// the endpoint runs in.
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
// Optimizations
skip_pg_catalog_updates: bool,
}
impl Endpoint {
@@ -222,7 +216,6 @@ impl Endpoint {
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
})
}
@@ -457,7 +450,7 @@ impl Endpoint {
// Create spec file
let spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
skip_pg_catalog_updates: false,
format_version: 1.0,
operation_uuid: None,
cluster: Cluster {

View File

@@ -364,7 +364,7 @@ impl LocalEnv {
//
// Initialize a new Neon repository
//
pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
ensure!(
@@ -372,29 +372,11 @@ impl LocalEnv {
"repository base path is missing"
);
if base_path.exists() {
if force {
println!("removing all contents of '{}'", base_path.display());
// instead of directly calling `remove_dir_all`, we keep the original dir but removing
// all contents inside. This helps if the developer symbol links another directory (i.e.,
// S3 local SSD) to the `.neon` base directory.
for entry in std::fs::read_dir(base_path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
fs::remove_dir_all(&path)?;
} else {
fs::remove_file(&path)?;
}
}
} else {
bail!(
"directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
base_path.display()
);
}
}
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
@@ -410,9 +392,7 @@ impl LocalEnv {
}
}
if !base_path.exists() {
fs::create_dir(base_path)?;
}
fs::create_dir(base_path)?;
// Generate keypair for JWT.
//

View File

@@ -148,14 +148,4 @@ mod tests {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
#[test]
fn parse_unknown_fields() {
// Forward compatibility test
let file = File::open("tests/cluster_spec.json").unwrap();
let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
let ob = json.as_object_mut().unwrap();
ob.insert("unknown_field_123123123".into(), "hello".into());
let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
}
}

View File

@@ -23,7 +23,6 @@ use prometheus::{Registry, Result};
pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
pub mod metric_vec_duration;
pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;

View File

@@ -1,23 +0,0 @@
//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec<T>.
use std::{future::Future, time::Instant};
pub trait DurationResultObserver {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
}
pub async fn observe_async_block_duration_by_result<
T,
E,
F: Future<Output = Result<T, E>>,
O: DurationResultObserver,
>(
observer: &O,
block: F,
) -> Result<T, E> {
let start = Instant::now();
let result = block.await;
let duration = start.elapsed();
observer.observe_result(&result, duration);
result
}

View File

@@ -70,14 +70,6 @@ impl RemotePath {
pub fn join(&self, segment: &Path) -> Self {
Self(self.0.join(segment))
}
pub fn get_path(&self) -> &PathBuf {
&self.0
}
pub fn extension(&self) -> Option<&str> {
self.0.extension()?.to_str()
}
}
/// Storage (potentially remote) API to manage its state.
@@ -94,19 +86,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>;
/// Lists all files in directory "recursively"
/// (not really recursively, because AWS has a flat namespace)
/// Note: This is subtely different than list_prefixes,
/// because it is for listing files instead of listing
/// names sharing common prefixes.
/// For example,
/// list_files("foo/bar") = ["foo/bar/cat123.txt",
/// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
/// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details.
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
@@ -195,14 +174,6 @@ impl GenericRemoteStorage {
}
}
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -48,14 +48,6 @@ impl LocalFs {
Ok(Self { storage_root })
}
// mirrors S3Bucket::s3_object_to_relative_path
fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath {
let relative_path = key
.strip_prefix(&self.storage_root)
.expect("relative path must contain storage_root as prefix");
RemotePath(relative_path.into())
}
async fn read_storage_metadata(
&self,
file_path: &Path,
@@ -140,34 +132,6 @@ impl RemoteStorage for LocalFs {
Ok(prefixes)
}
// recursively lists all files in a directory,
// mirroring the `list_files` for `s3_bucket`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let full_path = match folder {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
};
let mut files = vec![];
let mut directory_queue = vec![full_path.clone()];
while !directory_queue.is_empty() {
let cur_folder = directory_queue
.pop()
.expect("queue cannot be empty: we just checked");
let mut entries = fs::read_dir(cur_folder.clone()).await?;
while let Some(entry) = entries.next_entry().await? {
let file_name: PathBuf = entry.file_name().into();
let full_file_name = cur_folder.clone().join(&file_name);
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
files.push(file_remote_path.clone());
if full_file_name.is_dir() {
directory_queue.push(full_file_name);
}
}
}
Ok(files)
}
async fn upload(
&self,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -34,8 +34,6 @@ use crate::{
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
@@ -347,51 +345,6 @@ impl RemoteStorage for S3Bucket {
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];
loop {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
metrics::inc_list_objects();
let response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(folder_name.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})
.context("Failed to list files in S3 bucket")?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}
match response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(all_files)
}
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -471,33 +424,17 @@ impl RemoteStorage for S3Bucket {
delete_objects.push(obj_id);
}
for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
metrics::inc_delete_objects(chunk.len() as u64);
let resp = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
.send()
.await;
match resp {
Ok(resp) => {
if let Some(errors) = resp.errors {
metrics::inc_delete_objects_fail(errors.len() as u64);
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Err(e) => {
metrics::inc_delete_objects_fail(chunk.len() as u64);
return Err(e.into());
}
}
}
metrics::inc_delete_objects(paths.len() as u64);
self.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(delete_objects)).build())
.send()
.await
.map_err(|e| {
metrics::inc_delete_objects_fail(paths.len() as u64);
e
})?;
Ok(())
}

View File

@@ -24,7 +24,6 @@ enum RemoteOp {
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
DeleteObjects(Vec<RemotePath>),
}
impl UnreliableWrapper {
@@ -83,11 +82,6 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list_prefixes(prefix).await
}
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
self.inner.list_files(folder).await
}
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -127,18 +121,8 @@ impl RemoteStorage for UnreliableWrapper {
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
let mut error_counter = 0;
for path in paths {
if (self.delete(path).await).is_err() {
error_counter += 1;
}
}
if error_counter > 0 {
return Err(anyhow::anyhow!(
"failed to delete {} objects",
error_counter
));
self.delete(path).await?
}
Ok(())
}

View File

@@ -88,58 +88,6 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any
Ok(())
}
/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
/// See `s3_pagination_should_work` for more information.
///
/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
/// Then performs the following queries:
/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
#[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
#[tokio::test]
async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
anyhow::bail!("S3 init failed: {e:?}")
}
};
let test_client = Arc::clone(&ctx.enabled.client);
let base_prefix =
RemotePath::new(Path::new("folder1")).context("common_prefix construction")?;
let root_files = test_client
.list_files(None)
.await
.context("client list root files failure")?
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(
root_files,
ctx.remote_blobs.clone(),
"remote storage list_files on root mismatches with the uploads."
);
let nested_remote_files = test_client
.list_files(Some(&base_prefix))
.await
.context("client list nested files failure")?
.into_iter()
.collect::<HashSet<_>>();
let trim_remote_blobs: HashSet<_> = ctx
.remote_blobs
.iter()
.map(|x| x.get_path().to_str().expect("must be valid name"))
.filter(|x| x.starts_with("folder1"))
.map(|x| RemotePath::new(Path::new(x)).expect("must be valid name"))
.collect();
assert_eq!(
nested_remote_files, trim_remote_blobs,
"remote storage list_files on subdirrectory mismatches with the uploads."
);
Ok(())
}
#[test_context(MaybeEnabledS3)]
#[tokio::test]
async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
@@ -173,15 +121,10 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let data1 = "remote blob data1".as_bytes();
let data1_len = data1.len();
let data2 = "remote blob data2".as_bytes();
let data2_len = data2.len();
let data3 = "remote blob data3".as_bytes();
let data3_len = data3.len();
ctx.client
.upload(std::io::Cursor::new(data1), data1_len, &path1, None)
.await?;
@@ -190,18 +133,8 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
.upload(std::io::Cursor::new(data2), data2_len, &path2, None)
.await?;
ctx.client
.upload(std::io::Cursor::new(data3), data3_len, &path3, None)
.await?;
ctx.client.delete_objects(&[path1, path2]).await?;
let prefixes = ctx.client.list_prefixes(None).await?;
assert_eq!(prefixes.len(), 1);
ctx.client.delete_objects(&[path3]).await?;
Ok(())
}
@@ -315,66 +248,6 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
}
}
// NOTE: the setups for the list_prefixes test and the list_files test are very similar
// However, they are not idential. The list_prefixes function is concerned with listing prefixes,
// whereas the list_files function is concerned with listing files.
// See `RemoteStorage::list_files` documentation for more details
enum MaybeEnabledS3WithSimpleTestBlobs {
Enabled(S3WithSimpleTestBlobs),
Disabled,
UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
}
struct S3WithSimpleTestBlobs {
enabled: EnabledS3,
remote_blobs: HashSet<RemotePath>,
}
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
async fn setup() -> Self {
ensure_logging_ready();
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
);
return Self::Disabled;
}
let max_keys_in_list_response = 10;
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
ControlFlow::Continue(uploads) => {
info!("Remote objects created successfully");
Self::Enabled(S3WithSimpleTestBlobs {
enabled,
remote_blobs: uploads,
})
}
ControlFlow::Break(uploads) => Self::UploadsFailed(
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
S3WithSimpleTestBlobs {
enabled,
remote_blobs: uploads,
},
),
}
}
async fn teardown(self) {
match self {
Self::Disabled => {}
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
}
}
}
}
fn create_s3_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
@@ -385,7 +258,7 @@ fn create_s3_client(
let random_prefix_part = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("random s3 test prefix part calculation")?
.as_nanos();
.as_millis();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
@@ -491,52 +364,3 @@ async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<
}
}
}
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
async fn upload_simple_s3_data(
client: &Arc<GenericRemoteStorage>,
upload_tasks_count: usize,
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
info!("Creating {upload_tasks_count} S3 files");
let mut upload_tasks = JoinSet::new();
for i in 1..upload_tasks_count + 1 {
let task_client = Arc::clone(client);
upload_tasks.spawn(async move {
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
let blob_path = RemotePath::new(&blob_path)
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
debug!("Creating remote item {i} at path {blob_path:?}");
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
.await?;
Ok::<_, anyhow::Error>(blob_path)
});
}
let mut upload_tasks_failed = false;
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
while let Some(task_run_result) = upload_tasks.join_next().await {
match task_run_result
.context("task join failed")
.and_then(|task_result| task_result.context("upload task failed"))
{
Ok(upload_path) => {
uploaded_blobs.insert(upload_path);
}
Err(e) => {
error!("Upload task failed: {e:?}");
upload_tasks_failed = true;
}
}
}
if upload_tasks_failed {
ControlFlow::Break(uploaded_blobs)
} else {
ControlFlow::Continue(uploaded_blobs)
}
}

View File

@@ -1,6 +1,5 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::error::Error as StdError;
use thiserror::Error;
use tracing::error;
@@ -16,7 +15,7 @@ pub enum ApiError {
Unauthorized(String),
#[error("NotFound: {0}")]
NotFound(Box<dyn StdError + Send + Sync + 'static>),
NotFound(anyhow::Error),
#[error("Conflict: {0}")]
Conflict(String),

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
async-stream.workspace = true
async-trait.workspace = true
byteorder.workspace = true

View File

@@ -495,50 +495,50 @@ fn start_pageserver(
Ok(())
},
);
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
}
}
// Spawn a task to listen for libpq connections. It will spawn further tasks

View File

@@ -96,12 +96,12 @@ pub mod defaults {
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
[tenant_config]
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
@@ -111,8 +111,7 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
[remote_storage]
# [remote_storage]
"###
);

View File

@@ -186,8 +186,10 @@ paths:
schema:
$ref: "#/components/schemas/Error"
delete:
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
description: "Attempts to delete specified timeline. On 500 errors should be retried"
responses:
"200":
description: Ok
"400":
description: Error when no tenant id found in path or no timeline id
content:
@@ -212,12 +214,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"409":
description: Deletion is already in progress, continue polling
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"412":
description: Tenant is missing, or timeline has children
content:

View File

@@ -23,6 +23,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::disk_usage_eviction_task;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
@@ -34,7 +35,6 @@ use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use utils::{
auth::JwtAuth,
http::{
@@ -142,7 +142,7 @@ impl From<TenantMapInsertError> for ApiError {
impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
}
}
@@ -151,7 +151,7 @@ impl From<TenantStateError> for ApiError {
impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
e @ GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
@@ -169,7 +169,7 @@ impl From<SetNewTenantConfigError> for ApiError {
fn from(e: SetNewTenantConfigError) -> ApiError {
match e {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid).into())
ApiError::NotFound(anyhow!("tenant {}", tid))
}
e @ SetNewTenantConfigError::Persist(_) => {
ApiError::InternalServerError(anyhow::Error::new(e))
@@ -182,12 +182,11 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
match value {
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")),
HasChildren(children) => ApiError::PreconditionFailed(
format!("Cannot delete timeline which has child timelines: {children:?}")
.into_boxed_str(),
),
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
Other(e) => ApiError::InternalServerError(e),
}
}
@@ -328,17 +327,15 @@ async fn timeline_create_handler(
&ctx,
)
.await {
Ok(new_timeline) => {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
Err(tenant::CreateTimelineError::AlreadyExists) => {
json_response(StatusCode::CONFLICT, ())
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
Ok(None) => json_response(StatusCode::CONFLICT, ()), // timeline already exists
Err(err) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
@@ -400,7 +397,7 @@ async fn timeline_detail_handler(
let timeline = tenant
.get_timeline(timeline_id, false)
.map_err(|e| ApiError::NotFound(e.into()))?;
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
&timeline,
@@ -1064,7 +1061,7 @@ async fn timeline_download_remote_layers_handler_get(
let info = timeline
.get_download_all_remote_layers_task_info()
.context("task never started since last pageserver process start")
.map_err(|e| ApiError::NotFound(e.into()))?;
.map_err(ApiError::NotFound)?;
json_response(StatusCode::OK, info)
}
@@ -1075,7 +1072,7 @@ async fn active_timeline_of_active_tenant(
let tenant = mgr::get_tenant(tenant_id, true).await?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))
.map_err(ApiError::NotFound)
}
async fn always_panic_handler(
@@ -1131,6 +1128,8 @@ async fn disk_usage_eviction_run(
freed_bytes: 0,
};
use crate::task_mgr::MGMT_REQUEST_RUNTIME;
let (tx, rx) = tokio::sync::oneshot::channel();
let state = get_state(&r);
@@ -1148,7 +1147,7 @@ async fn disk_usage_eviction_run(
let _g = cancel.drop_guard();
crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,

View File

@@ -148,17 +148,17 @@ async fn import_rel(
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
//
// FIXME: Keep track of which relations we've already created?
// FIXME: use proper error type for this, instead of parsing the error message.
// Or better yet, keep track of which relations we've already created
// https://github.com/neondatabase/neon/issues/3309
if let Err(e) = modification
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
}
_ => return Err(e.into()),
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
}
}

View File

@@ -1,4 +1,4 @@
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
@@ -95,19 +95,21 @@ static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
pub static RECONSTRUCT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
static RECONSTRUCT_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value (reconstruct a page from deltas)",
"Time spent in reconstruct_value",
&["tenant_id", "timeline_id"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -122,74 +124,15 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_accesses_total",
"Number of read accesses to the page cache",
&["key_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
});
pub static PAGE_CACHE_READ_ACCESSES_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
});
pub static PAGE_CACHE_READ_ACCESSES_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["immutable"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_hits_total",
"Number of read accesses to the page cache that hit",
&["key_kind", "hit_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_READ_HITS_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["ephemeral", "-"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["immutable", "-"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "exact"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
.unwrap()
});
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wait_lsn_seconds",
@@ -485,27 +428,6 @@ pub static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub struct BasebackupQueryTime(HistogramVec);
pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
BasebackupQueryTime({
register_histogram_vec!(
"pageserver_basebackup_query_seconds",
"Histogram of basebackup queries durations, by result type",
&["result"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
})
});
impl DurationResultObserver for BasebackupQueryTime {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
let label_value = if res.is_ok() { "ok" } else { "error" };
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
metric.observe(duration.as_secs_f64());
}
}
pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
@@ -674,79 +596,6 @@ pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT: Lazy<IntCounter> =
Lazy::new(|| {
register_int_counter!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_started_count",
"Number of spawn_blocking calls made in Layer::get_value_reconstruct_data"
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE: Lazy<IntGauge> =
Lazy::new(|| {
register_int_gauge!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_active_gauge",
"Number of spawn_blocking calls active in Layer::get_value_reconstruct_data"
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY: Lazy<Histogram> = Lazy::new(
|| {
register_histogram!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_queue_delay_seconds",
"Time a Layer::get_value_reconstruct_data call spends in spawn_blocking queue until the first line of blockign code runs inside spawn_blocking",
vec![
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000, 0.500_000,
1.000_000, 2.000_000, 5.000_000, 10.000_000, 25.000_000, 50.000_000, 100.000_000,
],
)
.expect("failed to define a metric")
},
);
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_layer_get_value_reconstruct_data_completion_time_seconds",
"Time a Layer::get_value_reconstruct_data call takes to complete",
&["result"],
vec![
0.000_005,
0.000_010,
0.000_025,
0.000_050,
0.000_100,
0.000_250,
0.000_500,
0.001_000,
0.002_500,
0.005_000,
0.010_000,
0.025_000,
0.050_000,
0.100_000,
0.250_000,
0.500_000,
1.000_000,
2.000_000,
5.000_000,
10.000_000,
25.000_000,
50.000_000,
100.000_000,
]
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK: Lazy<Histogram> =
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["ok"]));
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR: Lazy<Histogram> =
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["error"]));
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -903,7 +752,10 @@ impl StorageTimeMetrics {
pub struct TimelineMetrics {
tenant_id: String,
timeline_id: String,
pub reconstruct_time_histo: Histogram,
pub get_reconstruct_data_time_histo: Histogram,
pub materialized_page_cache_hit_counter: GenericCounter<AtomicU64>,
pub materialized_page_cache_hit_upon_request_counter: GenericCounter<AtomicU64>,
pub flush_time_histo: StorageTimeMetrics,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
@@ -931,9 +783,15 @@ impl TimelineMetrics {
) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
@@ -975,13 +833,19 @@ impl TimelineMetrics {
let read_num_fs_layers = READ_NUM_FS_LAYERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
TimelineMetrics {
tenant_id,
timeline_id,
reconstruct_time_histo,
get_reconstruct_data_time_histo,
materialized_page_cache_hit_counter,
materialized_page_cache_hit_upon_request_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
@@ -1008,7 +872,10 @@ impl Drop for TimelineMetrics {
fn drop(&mut self) {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]);
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
@@ -1452,8 +1319,4 @@ pub fn preinitialize_metrics() {
// Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels.
BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset();
// Python tests need these.
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
MATERIALIZED_PAGE_CACHE_HIT.get();
}

View File

@@ -313,8 +313,6 @@ impl PageCache {
key: &Key,
lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_id,
@@ -325,17 +323,8 @@ impl PageCache {
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT.inc();
} else {
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN.inc();
}
Some((available_lsn, guard))
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
Some((lsn, guard))
} else {
panic!("unexpected key type in slot");
}
@@ -510,31 +499,11 @@ impl PageCache {
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE_READ_ACCESSES_EPHEMERAL,
&crate::metrics::PAGE_CACHE_READ_HITS_EPHEMERAL,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE_READ_ACCESSES_IMMUTABLE,
&crate::metrics::PAGE_CACHE_READ_HITS_IMMUTABLE,
),
};
read_access.inc();
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) =

View File

@@ -390,9 +390,7 @@ impl PageServerHandler {
};
// Check that the timeline exists
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| anyhow::anyhow!(e))?;
let timeline = tenant.get_timeline(timeline_id, true)?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
@@ -491,7 +489,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -904,7 +904,7 @@ where
self.check_permission(Some(tenant_id))?;
let lsn = if params.len() >= 3 {
let lsn = if params.len() == 3 {
Some(
Lsn::from_str(params[2])
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
@@ -913,24 +913,10 @@ where
None
};
metrics::metric_vec_duration::observe_async_block_duration_by_result(
&*crate::metrics::BASEBACKUP_QUERY_TIME,
async move {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
anyhow::Ok(())
},
)
.await?;
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {
@@ -1246,6 +1232,6 @@ async fn get_active_tenant_timeline(
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
.map_err(GetActiveTimelineError::Timeline)?;
Ok(timeline)
}

View File

@@ -43,16 +43,6 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
@@ -111,9 +101,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
@@ -158,9 +148,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
@@ -203,9 +193,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
// first try to lookup relation in cache
@@ -734,7 +724,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -761,7 +751,7 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -885,38 +875,32 @@ impl<'a> DatadirModification<'a> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> Result<(), RelationError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
}
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
// Didn't exist. Update dbdir
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
anyhow::bail!("rel {rel} already exists");
}
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
// Put size
@@ -941,7 +925,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
let last_lsn = self.tline.get_last_record_lsn();
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
let size_key = rel_size_to_key(rel);
@@ -972,7 +956,7 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// Put size
let size_key = rel_size_to_key(rel);
@@ -993,7 +977,7 @@ impl<'a> DatadirModification<'a> {
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);

View File

@@ -102,33 +102,11 @@ use crate::shutdown_pageserver;
// It's also good to avoid hogging all threads that would be needed to process
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
static PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE: Lazy<Option<usize>> = Lazy::new(|| {
let env_var: String = match std::env::var("PAGESERVER_TOKIO_MAX_BLOCKING_THREADS") {
Ok(v) => v,
Err(std::env::VarError::NotPresent) => {
debug!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS not set, using default");
return None;
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS is not valid UTF-8");
}
};
let pool_size = match env_var.parse() {
Ok(v) => v,
Err(e) => {
panic!("Failed to parse PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {e:?}");
}
};
eprintln!("using spawn_blocking pool size override from env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {pool_size:?}");
Some(pool_size)
});
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build()
.expect("Failed to create compute request runtime")
});
@@ -137,7 +115,6 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build()
.expect("Failed to create mgmt request runtime")
});
@@ -146,7 +123,6 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build()
.expect("Failed to create walreceiver runtime")
});
@@ -155,7 +131,6 @@ pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
.enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build()
.expect("Failed to create background op runtime")
});
@@ -531,17 +506,17 @@ pub async fn shutdown_tasks(
warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
let join_handle = tokio::select! {
let completed = tokio::select! {
biased;
_ = &mut join_handle => { None },
_ = &mut join_handle => { true },
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for {} to shut down", task.name);
Some(join_handle)
false
}
};
if let Some(join_handle) = join_handle {
if !completed {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)

View File

@@ -85,7 +85,9 @@ pub mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_cache;
pub mod layer_map;
pub mod layer_map_mgr;
pub mod manifest;
pub mod metadata;
@@ -421,32 +423,12 @@ remote:
}
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
NotActive {
tenant_id: TenantId,
timeline_id: TimelineId,
state: TimelineState,
},
#[error("Timeline {tenant_id}/{timeline_id} was not found")]
NotFound {
tenant_id: TenantId,
timeline_id: TimelineId,
},
}
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("NotFound")]
NotFound,
#[error("HasChildren")]
HasChildren(Vec<TimelineId>),
#[error("Timeline deletion is already in progress")]
AlreadyInProgress,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -493,22 +475,6 @@ pub(crate) enum ShutdownError {
AlreadyStopping,
}
struct DeletionGuard(OwnedMutexGuard<bool>);
impl DeletionGuard {
fn is_deleted(&self) -> bool {
*self.0
}
}
#[derive(thiserror::Error, Debug)]
pub enum CreateTimelineError {
#[error("a timeline with the given ID already exists")]
AlreadyExists,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
@@ -591,17 +557,10 @@ impl Tenant {
.context("failed to reconcile with remote")?
}
let layers = timeline.layer_mgr.read();
// Sanity check: a timeline should have some content.
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.0
.iter_historic_layers()
.next()
.is_some(),
ancestor.is_some() || layers.iter_historic_layers().next().is_some(),
"Timeline has no ancestor and no layer files"
);
@@ -975,117 +934,6 @@ impl Tenant {
tenant
}
pub fn scan_and_sort_timelines_dir(
self: Arc<Tenant>,
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
if !timeline_dir.exists() {
warn!(
"Timeline dir entry become invalid: {}",
timeline_dir.display()
);
continue;
}
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
if !timeline_dir.exists() {
warn!(
"Timeline dir entry become invalid: {}",
timeline_dir.display()
);
continue;
}
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
.conf
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
}
///
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
@@ -1102,16 +950,110 @@ impl Tenant {
utils::failpoint_sleep_millis_async!("before-loading-tenant");
// TODO split this into two functions, scan and actual load
// Load in-memory state to reflect the local files on disk
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let tenant_id = self.tenant_id;
let conf = self.conf;
let span = info_span!("blocking");
let cloned = Arc::clone(self);
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
let _g = span.entered();
cloned.scan_and_sort_timelines_dir()
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = conf.timelines_path(&tenant_id);
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file =
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) = remove_timeline_and_uninit_mark(
&timeline_dir,
&timeline_uninit_mark_file,
) {
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(conf, timeline_id, tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
})
.await
.context("load spawn_blocking")
@@ -1192,11 +1134,7 @@ impl Tenant {
)
.context("create_timeline_struct")?;
let guard = DeletionGuard(
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.expect("cannot happen because we're the only owner"),
);
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -1259,21 +1197,19 @@ impl Tenant {
&self,
timeline_id: TimelineId,
active_only: bool,
) -> Result<Arc<Timeline>, GetTimelineError> {
) -> anyhow::Result<Arc<Timeline>> {
let timelines_accessor = self.timelines.lock().unwrap();
let timeline = timelines_accessor
.get(&timeline_id)
.ok_or(GetTimelineError::NotFound {
tenant_id: self.tenant_id,
timeline_id,
})?;
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
})?;
if active_only && !timeline.is_active() {
Err(GetTimelineError::NotActive {
tenant_id: self.tenant_id,
anyhow::bail!(
"Timeline {}/{} is not active, state: {:?}",
self.tenant_id,
timeline_id,
state: timeline.current_state(),
})
timeline.current_state()
)
} else {
Ok(Arc::clone(timeline))
}
@@ -1305,7 +1241,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline(
pub async fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
@@ -1317,9 +1253,11 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
drop(timelines);
let timeline_uninit_mark = {
let timelines: MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>> =
self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
@@ -1339,6 +1277,7 @@ impl Tenant {
initdb_lsn,
None,
)
.await
}
/// Helper for unit tests to create an emtpy timeline.
@@ -1354,7 +1293,9 @@ impl Tenant {
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -1383,7 +1324,8 @@ impl Tenant {
/// Returns the new timeline ID and reference to its Timeline object.
///
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
/// the same timeline ID already exists, returns None. If `new_timeline_id` is not given,
/// a new unique ID is generated.
pub async fn create_timeline(
&self,
new_timeline_id: TimelineId,
@@ -1392,12 +1334,11 @@ impl Tenant {
pg_version: u32,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() {
return Err(CreateTimelineError::Other(anyhow::anyhow!(
"Cannot create timelines on inactive tenant"
)));
}
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
self.is_active(),
"Cannot create timelines on inactive tenant"
);
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
debug!("timeline {new_timeline_id} already exists");
@@ -1417,7 +1358,7 @@ impl Tenant {
.context("wait for timeline uploads to complete")?;
}
return Err(CreateTimelineError::AlreadyExists);
return Ok(None);
}
let loaded_timeline = match ancestor_timeline_id {
@@ -1432,12 +1373,12 @@ impl Tenant {
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
if ancestor_ancestor_lsn > *lsn {
// can we safely just branch from the ancestor instead?
return Err(CreateTimelineError::Other(anyhow::anyhow!(
bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
)));
);
}
// Wait for the WAL to arrive and be processed on the parent branch up
@@ -1471,7 +1412,7 @@ impl Tenant {
})?;
}
Ok(loaded_timeline)
Ok(Some(loaded_timeline))
}
/// perform one garbage collection iteration, removing old data files from disk.
@@ -1522,13 +1463,7 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.filter_map(|(timeline_id, timeline)| {
if timeline.is_active() {
Some((*timeline_id, timeline.clone()))
} else {
None
}
})
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
timelines_to_compact
@@ -1609,7 +1544,6 @@ impl Tenant {
&self,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
guard: DeletionGuard,
) -> anyhow::Result<()> {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
@@ -1625,7 +1559,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
let layer_removal_guard = timeline.lcache.delete_guard().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
@@ -1682,25 +1616,6 @@ impl Tenant {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint in_progress_delete");
fail::fail_point!("in_progress_delete");
}
})
.await
.expect("spawn_blocking");
}
{
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
@@ -1721,7 +1636,12 @@ impl Tenant {
drop(timelines);
}
drop(guard);
let remote_client = match &timeline.remote_client {
Some(remote_client) => remote_client,
None => return Ok(()),
};
remote_client.delete_all().await?;
Ok(())
}
@@ -1769,15 +1689,23 @@ impl Tenant {
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
delete_lock_guard = DeletionGuard(
//
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard =
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
);
.map_err(|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
// If another task finished the deletion just before we acquired the lock,
// return success.
if delete_lock_guard.is_deleted() {
if *delete_lock_guard {
return Ok(());
}
@@ -1851,7 +1779,7 @@ impl Tenant {
self: Arc<Self>,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
guard: DeletionGuard,
_guard: OwnedMutexGuard<bool>,
) {
let tenant_id = self.tenant_id;
let timeline_clone = Arc::clone(&timeline);
@@ -1864,7 +1792,7 @@ impl Tenant {
"timeline_delete",
false,
async move {
if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await {
if let Err(err) = self.delete_timeline(timeline_id, timeline).await {
error!("Error: {err:#}");
timeline_clone.set_broken(err.to_string())
};
@@ -2827,13 +2755,15 @@ impl Tenant {
src_timeline.pg_version,
);
let uninitialized_timeline = self.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)?;
let uninitialized_timeline = self
.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)
.await?;
let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2911,13 +2841,15 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
let raw_timeline = self.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
pgdata_lsn,
None,
)?;
let raw_timeline = self
.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
pgdata_lsn,
None,
)
.await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2970,7 +2902,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
fn prepare_new_timeline(
async fn prepare_new_timeline(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
@@ -2997,7 +2929,7 @@ impl Tenant {
.create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?;
timeline_struct.init_empty_layer_map(start_lsn);
timeline_struct.init_empty_layer_map(start_lsn).await?;
if let Err(e) =
self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata)
@@ -3420,8 +3352,9 @@ where
#[cfg(test)]
pub mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use std::sync::Arc;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use utils::logging;
use utils::lsn::Lsn;
@@ -3454,6 +3387,8 @@ pub mod harness {
buf.freeze()
}
static LOCK: Lazy<RwLock<()>> = Lazy::new(|| RwLock::new(()));
impl From<TenantConf> for TenantConfOpt {
fn from(tenant_conf: TenantConf) -> Self {
Self {
@@ -3480,16 +3415,33 @@ pub mod harness {
}
}
pub struct TenantHarness {
pub struct TenantHarness<'a> {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
pub lock_guard: (
Option<RwLockReadGuard<'a, ()>>,
Option<RwLockWriteGuard<'a, ()>>,
),
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
impl TenantHarness {
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
Self::create_internal(test_name, false)
}
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
Self::create_internal(test_name, true)
}
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
let lock_guard = if exclusive {
(None, Some(LOCK.write().unwrap()))
} else {
(Some(LOCK.read().unwrap()), None)
};
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
@@ -3525,6 +3477,7 @@ pub mod harness {
conf,
tenant_conf,
tenant_id,
lock_guard,
})
}
@@ -3549,12 +3502,26 @@ pub mod harness {
self.tenant_id,
None,
));
// populate tenant with locally available timelines
let mut timelines_to_load = HashMap::new();
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
{
let timeline_dir_entry = timeline_dir_entry?;
let timeline_id: TimelineId = timeline_dir_entry
.path()
.file_name()
.unwrap()
.to_string_lossy()
.parse()?;
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.load(None, ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
// TODO reuse Tenant::activate (needs broker)
tenant.state.send_replace(TenantState::Active);
for timeline in tenant.timelines.lock().unwrap().values() {
timeline.set_state(TimelineState::Active);
@@ -3659,7 +3626,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
match tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -4080,13 +4050,9 @@ mod tests {
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness.try_load(&ctx).await.err().expect("should fail");
// get all the stack with all .context, not tonly the last one
let message = format!("{err:#}");
let expected = "Failed to parse metadata bytes from path";
assert!(
message.contains(expected),
"message '{message}' expected to contain {expected}"
);
assert!(err
.to_string()
.starts_with("Failed to parse metadata bytes from path"));
let mut found_error_message = false;
let mut err_source = err.source();
@@ -4463,8 +4429,9 @@ mod tests {
.await;
let initdb_lsn = Lsn(0x20);
let utline =
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
let utline = tenant
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
@@ -4520,44 +4487,6 @@ mod tests {
assert!(expect_initdb_optimization);
assert!(initdb_optimization_count > 0);
}
Ok(())
}
#[tokio::test]
async fn test_uninit_mark_crash() -> anyhow::Result<()> {
let name = "test_uninit_mark_crash";
let harness = TenantHarness::create(name)?;
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
// Keeps uninit mark in place
std::mem::forget(tline);
}
let (tenant, _) = harness.load().await;
match tenant.get_timeline(TIMELINE_ID, false) {
Ok(_) => panic!("timeline should've been removed during load"),
Err(e) => {
assert_eq!(
e,
GetTimelineError::NotFound {
tenant_id: tenant.tenant_id,
timeline_id: TIMELINE_ID,
}
)
}
}
assert!(!harness
.conf
.timeline_path(&TIMELINE_ID, &tenant.tenant_id)
.exists());
assert!(!harness
.conf
.timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
.exists());
Ok(())
}

View File

@@ -38,8 +38,8 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}

View File

@@ -0,0 +1,143 @@
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
use super::Timeline;
use crate::tenant::layer_map::{self, LayerMap};
use anyhow::Result;
use std::sync::{Mutex, Weak};
use std::{collections::HashMap, sync::Arc};
pub struct LayerCache {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
/// Will be useful when we move evict / download to layer cache.
#[allow(unused)]
timeline: Weak<Timeline>,
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
}
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
mapping: Mutex::new(HashMap::new()),
timeline,
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
let guard = self.mapping.lock().unwrap();
guard.get(&desc.key()).expect("not found").clone()
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard(&self) -> DeleteGuard {
DeleteGuard(Arc::new(
self.layers_removal_lock.clone().lock_owned().await,
))
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within read path.
pub fn replace_and_verify(
&self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
let mut guard = self.mapping.lock().unwrap();
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
/// Called within write path. When compaction and image layer creation we will create new layers.
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
/// Will move logic to delete files here later.
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
}

View File

@@ -58,7 +58,7 @@ use std::sync::Arc;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc;
@@ -66,7 +66,7 @@ use super::storage_layer::PersistentLayerDesc;
///
/// LayerMap tracks what layers exist on a timeline.
///
#[derive(Default)]
#[derive(Default, Clone)]
pub struct LayerMap {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
@@ -649,6 +649,34 @@ impl LayerMap {
}
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
///
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
#[inline(always)]
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
}
#[cfg(test)]
mod tests {
use super::LayerMap;
@@ -658,10 +686,7 @@ mod tests {
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
timeline::LayerFileManager,
};
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use super::*;
@@ -694,31 +719,6 @@ mod tests {
)
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut mapping = LayerFileManager::new();
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = LayerDescriptor::from(name);
@@ -727,7 +727,6 @@ mod tests {
let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default();
let mut mapping = LayerFileManager::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
@@ -737,20 +736,11 @@ mod tests {
map.batch_update()
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
mapping
.replace_and_verify(remote, downloaded.clone())
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
);
map.batch_update()
.remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));

View File

@@ -43,6 +43,18 @@ impl Ord for LayerKey {
}
}
impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerKey {
fn from(layer: &'a L) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
@@ -60,6 +72,7 @@ impl From<&PersistentLayerDesc> for LayerKey {
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
#[derive(Clone)]
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
@@ -413,6 +426,7 @@ fn test_persistent_overlapping() {
///
/// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
#[derive(Clone)]
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
@@ -456,6 +470,64 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
self.buffer.insert(layer_key, None);
}
/// Replaces a previous layer with a new layer value.
///
/// The replacement is conditional on:
/// - there is an existing `LayerKey` record
/// - there is no buffered removal for the given `LayerKey`
/// - the given closure returns true for the current `Value`
///
/// The closure is used to compare the latest value (buffered insert, or existing layer)
/// against some expectation. This allows to use `Arc::ptr_eq` or similar which would be
/// inaccessible via `PartialEq` trait.
///
/// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,
new: Value,
check_expected: F,
) -> Replacement<Value>
where
F: FnOnce(&Value) -> bool,
{
let (slot, in_buffered) = match self.buffer.get(layer_key) {
Some(inner @ Some(_)) => {
// we compare against the buffered version, because there will be a later
// rebuild before querying
(inner.as_ref(), true)
}
Some(None) => {
// buffer has removal for this key; it will not be equivalent by any check_expected.
return Replacement::RemovalBuffered;
}
None => {
// no pending modification for the key, check layers
(self.layers.get(layer_key), false)
}
};
match slot {
Some(existing) if !check_expected(existing) => {
// unfortunate clone here, but otherwise the nll borrowck grows the region of
// 'a to cover the whole function, and we could not mutate in the other
// Some(existing) branch
Replacement::Unexpected(existing.clone())
}
None => Replacement::NotFound,
Some(_existing) => {
self.insert(layer_key.to_owned(), new);
Replacement::Replaced { in_buffered }
}
}
}
pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
@@ -524,6 +596,22 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
/// Outcome of the replace operation.
#[derive(Debug)]
pub enum Replacement<Value> {
/// Previous value was replaced with the new value.
Replaced {
/// Replacement happened for a scheduled insert.
in_buffered: bool,
},
/// Key was not found buffered updates or existing layers.
NotFound,
/// Key has been scheduled for removal, it was not replaced.
RemovalBuffered,
/// Previous value was rejected by the closure.
Unexpected(Value),
}
#[test]
fn test_retroactive_regression_1() {
let mut map = BufferedHistoricLayerCoverage::new();
@@ -632,3 +720,139 @@ fn test_retroactive_simple() {
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
}
}
#[test]
fn test_retroactive_replacement() {
let mut map = BufferedHistoricLayerCoverage::new();
let keys = [
LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
},
LayerKey {
key: 3..9,
lsn: 110..111,
is_image: true,
},
LayerKey {
key: 4..6,
lsn: 120..121,
is_image: true,
},
];
let layers = [
"Image 1".to_string(),
"Image 2".to_string(),
"Image 3".to_string(),
];
for (key, layer) in keys.iter().zip(layers.iter()) {
map.insert(key.to_owned(), layer.to_owned());
}
// rebuild is not necessary here, because replace works for both buffered updates and existing
// layers.
for (key, orig_layer) in keys.iter().zip(layers.iter()) {
let replacement = format!("Remote {orig_layer}");
// evict
let ret = map.replace(key, replacement.clone(), |l| l == orig_layer);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer}: {ret:?}"
);
map.rebuild();
let at = key.lsn.end + 1;
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(replacement.as_str()),
"query for 4 at version {at} after eviction",
);
// download
let ret = map.replace(key, orig_layer.clone(), |l| l == &replacement);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer} back: {ret:?}"
);
map.rebuild();
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(orig_layer.as_str()),
"query for 4 at version {at} after download",
);
}
}
#[test]
fn missing_key_is_not_inserted_with_replace() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
let ret = map.replace(&key, "should not replace", |_| true);
assert!(matches!(ret, Replacement::NotFound), "{ret:?}");
map.rebuild();
assert!(map
.get()
.expect("no changes to rebuild")
.get_version(102)
.is_none());
}
#[test]
fn replacing_buffered_insert_and_remove() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
map.insert(key.clone(), "Image 1");
let ret = map.replace(&key, "Remote Image 1", |&l| l == "Image 1");
assert!(
matches!(ret, Replacement::Replaced { in_buffered: true }),
"{ret:?}"
);
map.rebuild();
assert_eq!(
map.get()
.expect("rebuilt")
.get_version(102)
.unwrap()
.image_coverage
.query(4),
Some("Remote Image 1")
);
map.remove(key.clone());
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::RemovalBuffered),
"cannot replace after scheduled remove: {ret:?}"
);
map.rebuild();
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::NotFound),
"cannot replace after remove + rebuild: {ret:?}"
);
let at_version = map.get().expect("rebuilt").get_version(102);
assert!(at_version.is_none());
}

View File

@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
#[derive(Clone)]
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
}
/// Image and delta coverage at a specific LSN.
#[derive(Clone)]
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,

View File

@@ -0,0 +1,146 @@
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
//!
//! A common usage pattern is as follows:
//!
//! ```ignore
//! async fn compaction(&self) {
//! // Get the current state.
//! let state = self.layer_map_mgr.read();
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
//! // take a long time.
//! let compaction_result = self.do_compaction(&state).await?;
//! // Update the state.
//! self.layer_map_mgr.update(|mut state| async move {
//! // do updates to the state, return it.
//! Ok(state)
//! }).await?;
//! }
//! ```
use anyhow::Result;
use arc_swap::ArcSwap;
use futures::Future;
use std::sync::Arc;
use super::layer_map::LayerMap;
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
/// layer map.
pub struct LayerMapMgr {
layer_map: ArcSwap<LayerMap>,
state_lock: tokio::sync::Mutex<()>,
}
impl LayerMapMgr {
/// Get the current state of the layer map.
pub fn read(&self) -> Arc<LayerMap> {
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
// here.
self.layer_map.load_full()
}
/// Clone the layer map for modification.
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
(**self.layer_map.load()).clone()
}
pub fn new(layer_map: LayerMap) -> Self {
Self {
layer_map: ArcSwap::new(Arc::new(layer_map)),
state_lock: tokio::sync::Mutex::new(()),
}
}
/// Update the layer map.
pub async fn update<O, F>(&self, operation: O) -> Result<()>
where
O: FnOnce(LayerMap) -> F,
F: Future<Output = Result<LayerMap>>,
{
let state_lock = self.state_lock.lock().await;
let state = self.clone_for_write(&state_lock);
let new_state = operation(state).await?;
self.layer_map.store(Arc::new(new_state));
Ok(())
}
}
#[cfg(test)]
mod tests {
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
use super::*;
#[tokio::test]
async fn test_layer_map_manage() -> Result<()> {
let mgr = LayerMapMgr::new(Default::default());
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(0)..Key::from_i128(1),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_1 = mgr.read();
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(1)..Key::from_i128(2),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_2 = mgr.read();
// Modification should not be visible to the old reference.
assert_eq!(
ref_1
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
// Modification should be visible to the new reference.
assert_eq!(
ref_2
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert_eq!(
ref_2
.search(Key::from_i128(1), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(1)..Key::from_i128(2)
);
Ok(())
}
}

View File

@@ -675,7 +675,7 @@ pub async fn immediate_gc(
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?;
.map_err(ApiError::NotFound)?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
@@ -724,11 +724,11 @@ pub async fn immediate_compact(
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?;
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))?;
.map_err(ApiError::NotFound)?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);

View File

@@ -753,18 +753,22 @@ impl RemoteTimelineClient {
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint persist_deleted_index_part");
fail::fail_point!("persist_deleted_index_part");
}
})
.await
.expect("spawn_blocking");
}
#[cfg(feature = "testing")]
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!(
"at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
fail::fail_point!(
"persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
}
})
.await
.expect("spawn_blocking");
upload::upload_index_part(
self.conf,
&self.storage_impl,
@@ -862,8 +866,10 @@ impl RemoteTimelineClient {
"Found {} files not bound to index_file.json, proceeding with their deletion",
remaining.len()
);
warn!("About to remove {} files", remaining.len());
self.storage_impl.delete_objects(&remaining).await?;
for file in remaining {
warn!("Removing {}", file.object_name().unwrap_or_default());
self.storage_impl.delete(&file).await?;
}
}
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
@@ -1365,7 +1371,7 @@ mod tests {
struct TestSetup {
runtime: &'static tokio::runtime::Runtime,
entered_runtime: EnterGuard<'static>,
harness: TenantHarness,
harness: TenantHarness<'static>,
tenant: Arc<Tenant>,
tenant_ctx: RequestContext,
remote_fs_dir: PathBuf,

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::{Context, Result};
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
@@ -24,7 +24,7 @@ use pageserver_api::models::{
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;
@@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -176,16 +174,9 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
) -> Self {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
layer_map_lock_held_witness,
status,
LayerResidenceEventReason::LayerLoad,
);
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
}
@@ -196,7 +187,6 @@ impl LayerAccessStats {
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
@@ -204,11 +194,7 @@ impl LayerAccessStats {
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(
layer_map_lock_held_witness,
new_status,
LayerResidenceEventReason::ResidenceChange,
);
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new
}
@@ -228,7 +214,6 @@ impl LayerAccessStats {
///
pub(crate) fn record_residence_event(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
@@ -335,8 +320,7 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
pub trait Layer: std::fmt::Debug + Send + Sync {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -366,74 +350,13 @@ pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
static USE_SPAWN_BLOCKING: Lazy<bool> = Lazy::new(|| {
let val = std::env::var("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING")
.map(|s| s == "1")
.unwrap_or(false);
tracing::info!("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING={val}");
val
});
let use_spawn_blocking = *USE_SPAWN_BLOCKING;
let start = Instant::now();
let res = if !use_spawn_blocking {
anyhow::Ok(self.get_value_reconstruct_data_blocking(
key,
lsn_range,
reconstruct_data,
ctx,
))
} else {
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT.inc();
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.inc();
let res = tokio::task::spawn_blocking(move || {
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY
.observe(start.elapsed().as_secs_f64());
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking");
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.dec();
res
};
let histo = match &res {
Ok(Ok(_)) => &crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK,
Ok(Err(_)) | Err(_) => {
&crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR
}
};
histo.observe(start.elapsed().as_secs_f64());
res?
}
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -545,8 +468,17 @@ pub mod tests {
}
}
#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
@@ -561,16 +493,6 @@ pub mod tests {
self.layer_desc().lsn_range.clone()
}
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
todo!("This method shouldn't be part of the Layer trait")
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental

View File

@@ -218,7 +218,6 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -295,13 +294,13 @@ impl Layer for DeltaLayer {
Ok(())
}
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;
@@ -309,7 +308,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
@@ -375,9 +374,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}

View File

@@ -149,7 +149,6 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -182,18 +181,18 @@ impl Layer for ImageLayer {
}
/// Look up given page in the file
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -211,9 +210,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
Ok(ValueReconstructResult::Missing)
}
}

View File

@@ -110,7 +110,6 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
@@ -191,13 +190,13 @@ impl Layer for InMemoryLayer {
}
/// Look up given value in the layer.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -214,7 +213,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
@@ -234,9 +233,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}
}

View File

@@ -4,9 +4,8 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,7 +20,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -63,15 +62,14 @@ impl std::fmt::Debug for RemoteLayer {
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
@@ -221,7 +219,6 @@ impl RemoteLayer {
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
@@ -233,10 +230,8 @@ impl RemoteLayer {
self.desc.tenant_id,
&fname,
file_size,
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
} else {
let fname = self.desc.image_file_name();
@@ -246,10 +241,8 @@ impl RemoteLayer {
self.desc.tenant_id,
&fname,
file_size,
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -197,11 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let guard = self.lcache.layer_in_use_read().await;
let layers = self.layer_mgr.read();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = mapping.get_from_desc(&hist_layer);
let hist_layer = self.lcache.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -1321,7 +1321,7 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)

View File

@@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{Context, Result};
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
@@ -1082,10 +1082,7 @@ impl<'a> WalIngest<'a> {
.await?
{
// create it with 0 size initially, the logic below will extend it
modification
.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
modification.put_rel_creation(rel, 0, ctx).await?;
0
} else {
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?

View File

@@ -2675,6 +2675,7 @@ bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
@@ -2718,15 +2719,16 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
no_redo_needed = buffer < 0;
/* In both cases st lwlsn past this WAL record */
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
/* we don't have the buffer in memory, update lwLsn past this record,
* also evict page fro file cache
*/
/* we don't have the buffer in memory, update lwLsn past this record */
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
@@ -2734,10 +2736,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
{
update_cached_relsize(rnode, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rnode, forknum);
}
}
else
{
@@ -2769,7 +2768,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
SetLastWrittenLSNForRelation(end_recptr, rnode, forknum);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}

View File

@@ -257,7 +257,7 @@ nwp_register_gucs(void)
"Walproposer reconnects to offline safekeepers once in this interval.",
NULL,
&wal_acceptor_reconnect_timeout,
1000, 0, INT_MAX, /* default, min, max */
5000, 0, INT_MAX, /* default, min, max */
PGC_SIGHUP, /* context */
GUC_UNIT_MS, /* flags */
NULL, NULL, NULL);

View File

@@ -1,6 +1,5 @@
use futures::pin_mut;
use futures::StreamExt;
use futures::TryFutureExt;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
@@ -12,13 +11,8 @@ use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
@@ -96,17 +90,10 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
}
}
struct ConnInfo {
username: String,
dbname: String,
hostname: String,
password: String,
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<ConnInfo, anyhow::Error> {
) -> Result<(String, String, String, String), anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
@@ -159,12 +146,12 @@ fn get_conn_info(
}
}
Ok(ConnInfo {
username: username.to_owned(),
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
})
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
}
// TODO: return different http error codes
@@ -177,10 +164,10 @@ pub async fn handle(
// Determine the destination and connection params
//
let headers = request.headers();
let conn_info = get_conn_info(headers, sni_hostname)?;
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &conn_info.username),
("database", &conn_info.dbname),
("user", &username),
("database", &dbname),
("application_name", APP_NAME),
]);
@@ -199,20 +186,21 @@ pub async fn handle(
let creds = config
.auth_backend
.as_ref()
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let mut node_info = creds.wake_compute(&extra).await?.expect("msg");
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
@@ -232,10 +220,28 @@ pub async fn handle(
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?;
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
@@ -274,11 +280,6 @@ pub async fn handle(
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
})
})
.collect::<Vec<_>>()
@@ -302,70 +303,6 @@ pub async fn handle(
}))
}
/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with
/// the difference that it uses `tokio_postgres` for the connection.
#[instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}
//
// Convert postgres row with text-encoded values to JSON object
//

View File

@@ -26,6 +26,7 @@ use tls_listener::TlsListener;
use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
select,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
@@ -192,9 +193,14 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = sql_over_http::handle(config, request, sni_hostname)
.instrument(info_span!("sql-over-http"))
.await;
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(anyhow::anyhow!("Query timed out"))
}
response = sql_over_http::handle(config, request, sni_hostname) => {
response
}
};
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,

View File

@@ -22,7 +22,7 @@ use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -283,35 +283,34 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
pub fn invalidate_cache(node_info: &console::CachedNodeInfo) {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
) -> Result<PostgresConnection, compute::ConnectionError> {
// If we couldn't connect, a cached connection info might be to blame
// (e.g. the compute node's address might've changed at the wrong time).
// Invalidate the cache entry (if any) to prevent subsequent errors.
let invalidate_cache = |_: &compute::ConnectionError| {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
};
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute)
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
.inspect_err(invalidate_cache)
.await
}

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.70.0"
channel = "1.68.2"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -266,7 +266,7 @@ impl From<TimelineError> for ApiError {
fn from(te: TimelineError) -> ApiError {
match te {
TimelineError::NotFound(ttid) => {
ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
ApiError::NotFound(anyhow!("timeline {} not found", ttid))
}
_ => ApiError::InternalServerError(anyhow!("{}", te)),
}

191
scripts/comment-test-report.js Executable file → Normal file
View File

@@ -1,5 +1,3 @@
#! /usr/bin/env node
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
//
@@ -21,7 +19,7 @@
// })
//
// Equivalent of Python's defaultdict.
// Analog of Python's defaultdict.
//
// const dm = new DefaultMap(() => new DefaultMap(() => []))
// dm["firstKey"]["secondKey"].push("value")
@@ -34,7 +32,34 @@ class DefaultMap extends Map {
}
}
const parseReportJson = async ({ reportJsonUrl, fetch }) => {
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
const suites = await (await fetch(reportJsonUrl)).json()
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
@@ -58,7 +83,7 @@ const parseReportJson = async ({ reportJsonUrl, fetch }) => {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({ buildType, pgVersion } = match)
({buildType, pgVersion} = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
@@ -98,68 +123,37 @@ const parseReportJson = async ({ reportJsonUrl, fetch }) => {
}
}
return {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
}
}
const reportSummary = async (params) => {
const {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
reportUrl,
} = params
let summary = ""
const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount
summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
// Print test resuls from the newest to the oldest Postgres version for release and debug builds.
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(failedTests[pgVersion]).length > 0) {
summary += `#### Failures on Posgres ${pgVersion}\n\n`
commentBody += `#### Failures on Posgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(failedTests[pgVersion])) {
const links = []
for (const test of tests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
links.push(`[${test.buildType}](${allureLink})`)
}
summary += `- \`${testName}\`: ${links.join(", ")}\n`
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
}
const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name)
const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"`
summary += "```\n"
summary += `# Run failed on Postgres ${pgVersion} tests locally:\n`
summary += `${command}\n`
summary += "```\n"
commentBody += "```\n"
commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n`
commentBody += `${command}\n`
commentBody += "```\n"
}
}
if (flakyTestsCount > 0) {
summary += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
commentBody += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(flakyTests[pgVersion]).length > 0) {
summary += `#### Postgres ${pgVersion}\n\n`
commentBody += `#### Postgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) {
const links = []
for (const test of tests) {
@@ -167,57 +161,11 @@ const reportSummary = async (params) => {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
links.push(`[${status} ${test.buildType}](${allureLink})`)
}
summary += `- \`${testName}\`: ${links.join(", ")}\n`
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
}
}
}
summary += "\n</details>\n"
}
return summary
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
try {
const parsed = await parseReportJson({ reportJsonUrl, fetch })
commentBody += await reportSummary({ ...parsed, reportUrl })
} catch (error) {
commentBody += `### [full report](${reportUrl})\n___\n`
commentBody += `#### Failed to create a summary for the test run: \n`
commentBody += "```\n"
commentBody += `${error.stack}\n`
commentBody += "```\n"
commentBody += "\nTo reproduce and debug the error locally run:\n"
commentBody += "```\n"
commentBody += `scripts/comment-test-report.js ${reportJsonUrl}`
commentBody += "\n```\n"
commentBody += "\n</details>\n"
}
commentBody += autoupdateNotice
@@ -259,60 +207,3 @@ module.exports = async ({ github, context, fetch, report }) => {
})
}
}
// Equivalent of Python's `if __name__ == "__main__":`
// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module
if (require.main === module) {
// Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url)
if (process.argv.length !== 3) {
console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const jsonUrl = process.argv[2]
try {
new URL(jsonUrl)
} catch (error) {
console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html")
const githubMock = {
rest: {
issues: {
createComment: console.log,
listComments: async () => ({ data: [] }),
updateComment: console.log
},
repos: {
createCommitComment: console.log,
listCommentsForCommit: async () => ({ data: [] }),
updateCommitComment: console.log
}
}
}
const contextMock = {
repo: {
owner: 'testOwner',
repo: 'testRepo'
},
payload: {
number: 42,
pull_request: null,
},
sha: '0000000000000000000000000000000000000000',
}
module.exports({
github: githubMock,
context: contextMock,
fetch: fetch,
report: {
reportUrl: htmlUrl,
reportJsonUrl: jsonUrl,
}
})
}

View File

@@ -1,14 +1,12 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import sys
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
import backoff
import psycopg2
import psycopg2.extras
@@ -37,20 +35,9 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -128,7 +115,6 @@ def main():
parser.add_argument(
"--ingest",
type=Path,
required=True,
help="Path to perf test result file, or directory with perf test result files",
)
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
@@ -154,5 +140,4 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -1,13 +1,11 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import re
import sys
from contextlib import contextmanager
from pathlib import Path
import backoff
import psycopg2
CREATE_TABLE = """
@@ -31,20 +29,9 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -114,5 +101,4 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Any, List, MutableMapping, cast
from typing import List
import pytest
from _pytest.config import Config
@@ -56,15 +56,3 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]):
# Rerun 3 times = 1 original run + 2 reruns
log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times")
item.add_marker(pytest.mark.flaky(reruns=2))
# pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns),
# we can workaround it by setting `timeout_func_only` to True[1].
# Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2],
# but we still can do it using pytest marker.
#
# - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99
# - [2] https://github.com/pytest-dev/pytest-timeout/issues/142
timeout_marker = item.get_closest_marker("timeout")
if timeout_marker is not None:
kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs)
kwargs["func_only"] = True

View File

@@ -57,17 +57,14 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_current_logical_size",
"pageserver_resident_physical_size",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
"pageserver_getpage_get_reconstruct_data_seconds_count",
"pageserver_getpage_get_reconstruct_data_seconds_sum",
@@ -76,6 +73,8 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_io_operations_seconds_count",
"pageserver_io_operations_seconds_sum",
"pageserver_last_record_lsn",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_read_num_fs_layers_bucket",
"pageserver_read_num_fs_layers_count",
"pageserver_read_num_fs_layers_sum",

View File

@@ -1631,8 +1631,6 @@ class NeonPageserver(PgProtocol):
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
]
def start(
@@ -2415,17 +2413,6 @@ class Endpoint(PgProtocol):
return self
def respec(self, **kwargs):
"""Update the endpoint.json file used by control_plane."""
# Read config
config_path = os.path.join(self.endpoint_path(), "endpoint.json")
with open(config_path, "r") as f:
data_dict = json.load(f)
# Write it back updated
with open(config_path, "w") as file:
json.dump(dict(data_dict, **kwargs), file, indent=4)
def stop(self) -> "Endpoint":
"""
Stop the Postgres instance if it's running.

View File

@@ -342,11 +342,6 @@ class PageserverHttpClient(requests.Session):
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
For longer description consult with pageserver openapi spec.
"""
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs
)

View File

@@ -193,30 +193,19 @@ def wait_for_upload_queue_empty(
time.sleep(0.2)
def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
last_exc = None
for _ in range(2):
time.sleep(0.250)
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
last_exc = e
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
def timeline_delete_wait_completed(
def assert_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
**delete_args,
):
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)
"""Asserts that timeline_detail returns 404, or dumps the detail."""
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.error(e)
if e.status_code == 404:
return
else:
raise
raise Exception("detail succeeded (it should return 404)")

View File

@@ -32,18 +32,13 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
env.neon_cli.create_branch("test_startup")
endpoint = None
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{i}_start_and_select"):
if endpoint:
endpoint.start()
else:
endpoint = env.endpoints.create_start("test_startup")
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Get metrics
@@ -62,9 +57,6 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
# Stop so we can restart
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
# This test sometimes runs for longer than the global 5 minute timeout.
@pytest.mark.timeout(600)

View File

@@ -1,2 +1,2 @@
pg8000==1.29.8
pg8000==1.29.4
scramp>=1.4.3

View File

@@ -396,9 +396,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "openssl"
version = "0.10.55"
version = "0.10.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
dependencies = [
"bitflags",
"cfg-if",
@@ -428,9 +428,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.90"
version = "0.9.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
dependencies = [
"cc",
"libc",

View File

@@ -1,4 +1,4 @@
FROM rust:1.70
FROM rust:1.69
WORKDIR /source
COPY . .

View File

@@ -5,8 +5,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/vapor/postgres-nio.git",
"state" : {
"revision" : "061a0836d7c1887e04a975d1d2eaa2ef5fd7dfab",
"version" : "1.16.0"
"revision" : "dbf9c2eb596df39cba8ff3f74d74b2e6a31bd937",
"version" : "1.14.1"
}
},
{
@@ -59,8 +59,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio.git",
"state" : {
"revision" : "6213ba7a06febe8fef60563a4a7d26a4085783cf",
"version" : "2.54.0"
"revision" : "d1690f85419fdac8d54e350fb6d2ab9fd95afd75",
"version" : "2.51.1"
}
},
{

View File

@@ -4,7 +4,7 @@ import PackageDescription
let package = Package(
name: "PostgresNIOExample",
dependencies: [
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.16.0")
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.14.1")
],
targets: [
.executableTarget(

View File

@@ -5,7 +5,23 @@
"packages": {
"": {
"dependencies": {
"postgresql-client": "2.5.9"
"postgresql-client": "2.5.5"
}
},
"node_modules/debug": {
"version": "4.3.4",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
"dependencies": {
"ms": "2.1.2"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/doublylinked": {
@@ -25,6 +41,11 @@
"putil-promisify": "^1.8.6"
}
},
"node_modules/ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"node_modules/obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
@@ -42,28 +63,30 @@
}
},
"node_modules/postgresql-client": {
"version": "2.5.9",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.9.tgz",
"integrity": "sha512-s+kgTN6TfWLzehEyxw4Im4odnxVRCbZ0DEJzWS6SLowPAmB2m1/DOiOvZC0+ZVoi5AfbGE6SBqFxKguSyVAXZg==",
"version": "2.5.5",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.5.tgz",
"integrity": "sha512-2Mu3i+6NQ9cnkoZNd0XeSZo9WoUpuWf4ZSiCCoDWSj82T93py2/SKXZ1aUaP8mVaU0oKpyyGe0IwLYZ1VHShnA==",
"dependencies": {
"debug": "^4.3.4",
"doublylinked": "^2.5.2",
"lightning-pool": "^4.2.1",
"postgres-bytea": "^3.0.0",
"power-tasks": "^1.7.0",
"power-tasks": "^1.6.4",
"putil-merge": "^3.10.3",
"putil-promisify": "^1.10.0",
"putil-varhelpers": "^1.6.5"
},
"engines": {
"node": ">=16.0",
"node": ">=14.0",
"npm": ">=7.0.0"
}
},
"node_modules/power-tasks": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.7.0.tgz",
"integrity": "sha512-rndZXCDxhuIDjPUJJvQwBDHaYagCkjvbPF/NA+omh/Ef4rAI9KtnvdA0k98dyiGpn1zXOpc6c2c0JWzg/xAhJg==",
"version": "1.6.4",
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.6.4.tgz",
"integrity": "sha512-LX8GGgEIP1N7jsZqlqZ275e6f1Ehq97APCEGj8uVO0NoEoB+77QUX12BFv3LmlNKfq4fIuNSPiHhyHFjqn2gfA==",
"dependencies": {
"debug": "^4.3.4",
"doublylinked": "^2.5.2",
"strict-typed-events": "^2.3.1"
},
@@ -109,9 +132,9 @@
}
},
"node_modules/ts-gems": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.4.0.tgz",
"integrity": "sha512-SdugYAXoWvbqrxLodIObzxhEKacDxh5LfAJIiIkiH7q5thvuuCzdmkdTVQYf7uEDrEpPhfx4tokDMamdO3be9A=="
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.3.0.tgz",
"integrity": "sha512-bUvrwrzlct7vfaNvtgMhynDf6lAki/kTtrNsIGhX6l7GJGK3s6b8Ro7dazOLXabV0m2jyShBzDQ8X1+h/C2Cug=="
}
}
}

View File

@@ -1,6 +1,6 @@
{
"type": "module",
"dependencies": {
"postgresql-client": "2.5.9"
"postgresql-client": "2.5.5"
}
}

View File

@@ -1,4 +1,4 @@
FROM node:20
FROM node:18
WORKDIR /source
COPY . .

View File

@@ -5,16 +5,16 @@
"packages": {
"": {
"dependencies": {
"@neondatabase/serverless": "0.4.18",
"@neondatabase/serverless": "0.4.3",
"ws": "8.13.0"
}
},
"node_modules/@neondatabase/serverless": {
"version": "0.4.18",
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.18.tgz",
"integrity": "sha512-2TZnIyRGC/+0fjZ8TKCzaSTPUD94PM7NBGuantGZbUrbWyqBwGnUoRtdZAQ95qBKVHqORLVfymlv2NE+HQMFeA==",
"version": "0.4.3",
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.3.tgz",
"integrity": "sha512-U8tpuF5f0R5WRsciR7iaJ5S2h54DWa6Z6CEW+J4KgwyvRN3q3qDz0MibdfFXU0WqnRoi/9RSf/2XN4TfeaOCbQ==",
"dependencies": {
"@types/pg": "8.6.6"
"@types/pg": "^8.6.6"
}
},
"node_modules/@types/node": {

View File

@@ -1,7 +1,7 @@
{
"type": "module",
"dependencies": {
"@neondatabase/serverless": "0.4.18",
"@neondatabase/serverless": "0.4.3",
"ws": "8.13.0"
}
}

View File

@@ -2,7 +2,6 @@ import copy
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Optional
@@ -16,11 +15,7 @@ from fixtures.neon_fixtures import (
PortDistributor,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -422,7 +417,7 @@ def check_neon_works(
)
shutil.rmtree(repo_dir / "local_fs_remote_storage")
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
pageserver_http.timeline_delete(tenant_id, timeline_id)
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
pg_bin.run(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
@@ -449,7 +444,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
"""
with output.open("w") as stdout:
res = subprocess.run(
rv = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
@@ -461,53 +456,4 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
stdout=stdout,
)
differs = res.returncode != 0
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
if differs:
with tempfile.NamedTemporaryFile(mode="w") as tmp:
tmp.write(PR4425_ALLOWED_DIFF)
tmp.flush()
allowed = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
r"--ignore-matching-lines=^---", # Ignore diff headers
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
"--ignore-matching-lines=^@@", # Ignore diff blocks location
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
"--ignore-blank-lines",
str(output),
str(tmp.name),
],
)
differs = allowed.returncode != 0
return differs
PR4425_ALLOWED_DIFF = """
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
@@ -13,12 +13,20 @@
CREATE ROLE cloud_admin;
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
+CREATE ROLE neon_superuser;
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
--
-- User Configurations
--
+--
+-- Role memberships
+--
+
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
"""
return rv.returncode != 0

View File

@@ -1,6 +1,7 @@
import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Tuple
import pytest
@@ -427,14 +428,14 @@ def poor_mans_du(
largest_layer = 0
smallest_layer = None
for tenant_id, timeline_id in timelines:
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
total = 0
for file in timeline_dir.iterdir():
dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
assert dir.exists(), f"timeline dir does not exist: {dir}"
sum = 0
for file in dir.iterdir():
if "__" not in file.name:
continue
size = file.stat().st_size
total += size
sum += size
largest_layer = max(largest_layer, size)
if smallest_layer:
smallest_layer = min(smallest_layer, size)
@@ -442,8 +443,8 @@ def poor_mans_du(
smallest_layer = size
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}")
log.info(f"{tenant_id}/{timeline_id}: sum {total}")
total_on_disk += total
log.info(f"{tenant_id}/{timeline_id}: sum {sum}")
total_on_disk += sum
assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0
return (total_on_disk, largest_layer, smallest_layer or 0)

View File

@@ -1,5 +1,3 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnv
@@ -12,10 +10,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
caught_up = False
cought_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
@@ -59,7 +56,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
res = s_cur.fetchone()
assert res is not None
while not caught_up:
while not cought_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
@@ -69,7 +66,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
caught_up = secondary_lsn >= primary_lsn
cought_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()

View File

@@ -14,11 +14,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
@@ -155,7 +151,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
".*files not bound to index_file.json, proceeding with their deletion.*"
)
timeline_delete_wait_completed(client, tenant, timeline)
client.timeline_delete(tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)

View File

@@ -24,13 +24,7 @@ def test_basic_eviction(
test_name="test_download_remote_layers_api",
)
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable gc and compaction background loops because they perform on-demand downloads
"gc_period": "0s",
"compaction_period": "0s",
}
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
@@ -53,12 +47,7 @@ def test_basic_eviction(
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
endpoint.stop()
for sk in env.safekeepers:
sk.stop()
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)

View File

@@ -713,7 +713,9 @@ def test_ondemand_download_failure_to_replace(
# error message is not useful
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2)
actual_message = ".* ERROR .*layermap-replace-notfound"
actual_message = (
".* ERROR .*replacing downloaded layer into layermap failed because layer was not found"
)
assert env.pageserver.log_contains(actual_message) is not None
env.pageserver.allowed_errors.append(actual_message)

View File

@@ -20,7 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
@@ -535,7 +535,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
"pitr_interval": "0s",
}
)
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
client = env.pageserver.http_client()
@@ -597,11 +597,14 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
)
client.timeline_delete(tenant_id, timeline_id)
env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*")
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
timeline_delete_wait_completed(client, tenant_id, timeline_id)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id))
assert not timeline_path.exists()

View File

@@ -632,14 +632,14 @@ def test_ignored_tenant_download_missing_layers(
# ignore the tenant and remove its layers
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
layers_removed = False
for dir_entry in timeline_dir.iterdir():
for dir_entry in tenant_timeline_dir.iterdir():
if dir_entry.name.startswith("00000"):
# Looks like a layer file. Remove it
dir_entry.unlink()
layers_removed = True
assert layers_removed, f"Found no layers for tenant {timeline_dir}"
assert layers_removed, f"Found no layers for tenant {tenant_timeline_dir}"
# now, load it from the local files and expect it to work due to remote storage restoration
pageserver_http.tenant_load(tenant_id=tenant_id)
@@ -688,14 +688,14 @@ def test_ignored_tenant_stays_broken_without_metadata(
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
metadata_removed = False
for dir_entry in timeline_dir.iterdir():
for dir_entry in tenant_timeline_dir.iterdir():
if dir_entry.name == "metadata":
# Looks like a layer file. Remove it
dir_entry.unlink()
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {timeline_dir}"
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"

View File

@@ -214,7 +214,9 @@ def switch_pg_to_new_pageserver(
endpoint.start()
timeline_to_detach_local_path = env.timeline_dir(tenant_id, timeline_id)
timeline_to_detach_local_path = (
env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
files_before_detach = os.listdir(timeline_to_detach_local_path)
assert (
"metadata" in files_before_detach
@@ -417,6 +419,8 @@ def test_tenant_relocation(
new_pageserver_http.tenant_attach(tenant_id)
# wait for tenant to finish attaching
tenant_status = new_pageserver_http.tenant_status(tenant_id=tenant_id)
assert tenant_status["state"]["slug"] in ["Attaching", "Active"]
wait_until(
number_of_iterations=10,
interval=1,

View File

@@ -11,12 +11,10 @@ from fixtures.neon_fixtures import (
wait_for_wal_insert_lsn,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@pytest.mark.xfail
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
@@ -45,16 +43,12 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
# we've disabled the autovacuum and checkpoint
# so background processes should not change the size.
# If this test will flake we should probably loosen the check
assert (
size == initial_size
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
assert size == initial_size, "starting idle compute should not change the tenant size"
# the size should be the same, until we increase the size over the
# gc_horizon
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
assert (
size == initial_size
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
expected_inputs = {
"segments": [
@@ -323,7 +317,6 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
size_debug_file.write(size_debug)
@pytest.mark.xfail
def test_single_branch_get_tenant_size_grows(
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
):
@@ -339,13 +332,13 @@ def test_single_branch_get_tenant_size_grows(
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x3BA00
gc_horizon = 0x38000
# it's a bit of a hack, but different versions of postgres have different
# amount of WAL generated for the same amount of data. so we need to
# adjust the gc_horizon accordingly.
if pg_version == PgVersion.V14:
gc_horizon = 0x4A000
gc_horizon = 0x40000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
@@ -366,11 +359,11 @@ def test_single_branch_get_tenant_size_grows(
if current_lsn - initdb_lsn >= gc_horizon:
assert (
size >= prev_size
), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
else:
assert (
size > prev_size
), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
def get_current_consistent_size(
env: NeonEnv,
@@ -635,12 +628,12 @@ def test_get_tenant_size_with_multiple_branches(
size_debug_file_before.write(size_debug)
# teardown, delete branches, and the size should be going down
timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id)
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
size_after_deleting_first = http_client.tenant_size(tenant_id)
assert size_after_deleting_first < size_after_thinning_branch
timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id)
http_client.timeline_delete(tenant_id, second_branch_timeline_id)
size_after_deleting_second = http_client.tenant_size(tenant_id)
assert size_after_deleting_second < size_after_deleting_first

View File

@@ -1,10 +1,6 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_until_tenant_active,
)
from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
@@ -28,7 +24,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
def delete_all_timelines(tenant: TenantId):
timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)]
for t in timelines:
timeline_delete_wait_completed(client, tenant, t)
client.timeline_delete(tenant, t)
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()

View File

@@ -21,7 +21,6 @@ from fixtures.neon_fixtures import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -214,7 +213,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
# Test (a subset of) pageserver global metrics
for metric in PAGESERVER_GLOBAL_METRICS:
ps_samples = ps_metrics.query_all(metric, {})
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
assert len(ps_samples) > 0
for sample in ps_samples:
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
log.info(f"{sample.name}{{{labels}}} {sample.value}")
@@ -319,10 +318,9 @@ def test_pageserver_with_empty_tenants(
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
timeline_delete_wait_completed(
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
client.timeline_delete(
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(

View File

@@ -257,7 +257,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
env.endpoints.stop_all()
env.pageserver.stop()
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_truncated = None
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):

View File

@@ -17,10 +17,9 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_timeline_detail_404,
wait_until_tenant_active,
wait_until_timeline_state,
)
@@ -84,7 +83,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id),
func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id),
)
assert not timeline_path.exists()
@@ -95,15 +94,15 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
) as exc:
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
assert exc.value.status_code == 404
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: timeline_delete_wait_completed(
ps_http, env.initial_tenant, parent_timeline_id
),
)
# FIXME leaves tenant without timelines, should we prevent deletion of root timeline?
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id),
)
assert exc.value.status_code == 404
# Check that we didn't pick up the timeline again after restart.
# See https://github.com/neondatabase/neon/issues/3560
@@ -144,6 +143,7 @@ def test_delete_timeline_post_rm_failure(
ps_http.configure_failpoints((failpoint_name, "return"))
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -165,7 +165,13 @@ def test_delete_timeline_post_rm_failure(
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state
timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline)
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
with pytest.raises(PageserverApiException) as e:
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert e.value.status_code == 404
env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*")
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
)
@@ -241,7 +247,13 @@ def test_timeline_resurrection_on_attach(
pass
# delete new timeline
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id))
##### Stop the pageserver instance, erase all its data
env.endpoints.stop_all()
@@ -326,6 +338,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -344,15 +357,12 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
try:
data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
log.debug(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code != 404:
raise
else:
raise Exception("detail succeeded (it should return 404)")
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id)
)
assert (
not leaf_timeline_path.exists()
@@ -379,8 +389,13 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
timeline_delete_wait_completed(
ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id
ps_http.timeline_delete(env.initial_tenant, timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id)
)
assert_prefix_empty(
@@ -404,27 +419,23 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
@pytest.mark.parametrize(
"stuck_failpoint",
["persist_deleted_index_part", "in_progress_delete"],
)
def test_concurrent_timeline_delete_stuck_on(
neon_env_builder: NeonEnvBuilder, stuck_failpoint: str
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
If delete is stuck console will eventually retry deletion.
So we need to be sure that these requests wont interleave with each other.
In this tests we check two places where we can spend a lot of time.
This is a regression test because there was a bug when DeletionGuard wasnt propagated
to the background task.
Ensure that when retry comes if we're still stuck request will get an immediate error response,
signalling to console that it should retry later.
If we're stuck uploading the index file with the is_delete flag,
eventually console will hand up and retry.
If we're still stuck at the retry time, ensure that the retry
fails with status 500, signalling to console that it should retry
later.
Ideally, timeline_delete should return 202 Accepted and require
console to poll for completion, but, that would require changing
the API contract.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}",
test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload",
)
env = neon_env_builder.init_start()
@@ -434,14 +445,13 @@ def test_concurrent_timeline_delete_stuck_on(
ps_http = env.pageserver.http_client()
# make the first call sleep practically forever
ps_http.configure_failpoints((stuck_failpoint, "pause"))
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
def first_call(result_queue):
try:
log.info("first call start")
timeline_delete_wait_completed(
ps_http, env.initial_tenant, child_timeline_id, timeout=10
)
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
log.info("first call success")
result_queue.put("success")
except Exception:
@@ -456,17 +466,17 @@ def test_concurrent_timeline_delete_stuck_on(
def first_call_hit_failpoint():
assert env.pageserver.log_contains(
f".*{child_timeline_id}.*at failpoint {stuck_failpoint}"
f".*{child_timeline_id}.*at failpoint {failpoint_name}"
)
wait_until(50, 0.1, first_call_hit_failpoint)
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "Timeline deletion is already in progress"
error_msg_re = "timeline deletion is already in progress"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 409
assert second_call_err.value.status_code == 500
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
# the second call will try to transition the timeline into Stopping state as well
env.pageserver.allowed_errors.append(
@@ -474,12 +484,8 @@ def test_concurrent_timeline_delete_stuck_on(
)
log.info("second call failed as expected")
# ensure it is not 404 and stopping
detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert detail["state"] == "Stopping"
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((stuck_failpoint, "off"))
ps_http.configure_failpoints((failpoint_name, "off"))
result = first_call_result.get()
assert result == "success"
@@ -492,10 +498,8 @@ def test_concurrent_timeline_delete_stuck_on(
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
If the client hangs up before we start the index part upload but after deletion is scheduled
we mark it
If the client hangs up before we start the index part upload but after we mark it
deleted in local memory, a subsequent delete_timeline call should be able to do
another delete timeline operation.
This tests cancel safety up to the given failpoint.
@@ -511,18 +515,12 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
ps_http = env.pageserver.http_client()
failpoint_name = "persist_deleted_index_part"
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
with pytest.raises(requests.exceptions.Timeout):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Timeline deletion is already in progress.*"
)
with pytest.raises(PageserverApiException, match="Timeline deletion is already in progress"):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# make sure the timeout was due to the failpoint
at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*"
@@ -554,7 +552,12 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id)
notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found"
env.pageserver.allowed_errors.append(".*" + notfound_message)
with pytest.raises(PageserverApiException, match=notfound_message) as exc:
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert exc.value.status_code == 404
@pytest.mark.parametrize(
@@ -613,7 +616,12 @@ def test_timeline_delete_works_for_remote_smoke(
for timeline_id in reversed(timeline_ids):
# note that we need to finish previous deletion before scheduling next one
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id))
assert_prefix_empty(
neon_env_builder,

View File

@@ -24,7 +24,6 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
@@ -273,7 +272,7 @@ def test_timeline_initial_logical_size_calculation_cancellation(
if deletion_method == "tenant_detach":
client.tenant_detach(tenant_id)
elif deletion_method == "timeline_delete":
timeline_delete_wait_completed(client, tenant_id, timeline_id)
client.timeline_delete(tenant_id, timeline_id)
delete_timeline_success.put(True)
except PageserverApiException:
delete_timeline_success.put(False)

View File

@@ -31,11 +31,7 @@ from fixtures.neon_fixtures import (
SafekeeperPort,
available_remote_storages,
)
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -552,15 +548,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
f"sk_id={sk.id} to flush {last_lsn}",
)
ps_http = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
ps_cli = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop_and_destroy()
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
ps_cli.timeline_delete(tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests
# scenario of manual recovery on different set of safekeepers.
@@ -575,21 +571,11 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
# Terminate first all safekeepers to prevent communication unexpectantly
# advancing peer_horizon_lsn.
for sk in env.safekeepers:
cli = sk.http_client()
cli.timeline_delete_force(tenant_id, timeline_id)
# restart safekeeper to clear its in-memory state
sk.stop()
# wait all potenital in flight pushes to broker arrive before starting
# safekeepers (even without sleep, it is very unlikely they are not
# delivered yet).
time.sleep(1)
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
sk.stop().start()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
@@ -597,7 +583,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
shutil.copy(f_partial_saved, f_partial_path)
# recreate timeline on pageserver from scratch
ps_http.timeline_create(
ps_cli.timeline_create(
pg_version=PgVersion(pg_version),
tenant_id=tenant_id,
new_timeline_id=timeline_id,
@@ -612,7 +598,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
if elapsed > wait_lsn_timeout:
raise RuntimeError("Timed out waiting for WAL redo")
tenant_status = ps_http.tenant_status(tenant_id)
tenant_status = ps_cli.tenant_status(tenant_id)
if tenant_status["state"]["slug"] == "Loading":
log.debug(f"Tenant {tenant_id} is still loading, retrying")
else:

View File

@@ -1,5 +1,3 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
@@ -42,10 +40,7 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
# Kills one of the safekeepers and ensures that only the active ones are printed in the state.
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
# Trigger WAL wait timeout faster
neon_env_builder.pageserver_config_override = """
wait_lsn_timeout = "1s"
tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"}
"""
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
neon_env_builder.safekeepers_id_start = 12345
neon_env_builder.num_safekeepers = 3
@@ -75,8 +70,6 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
stopped_safekeeper_id = stopped_safekeeper.id
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
stopped_safekeeper.stop()
# sleep until stopped safekeeper is removed from candidates
time.sleep(2)
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)