Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
7e175400ab Reduce noise from moto GET/PUT operations
Moto prints messages like this:

    127.0.0.1 - - [07/Oct/2024 12:35:16] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -

After the root logger adds its context information, this is what
actually gets printed to the log:

    2024-10-07 22:35:16.371 INFO [_internal.py:97] 127.0.0.1 - - [07/Oct/2024 22:35:16] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -

That's very verbose. Remove the hostname and the extra timestamp, to
make it a little less verbose. With this PR, the final output looks
like this:

    2024-10-07 22:35:16.371 INFO [_internal.py:97] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -
2024-10-09 18:53:08 +03:00
93 changed files with 1055 additions and 3011 deletions

View File

@@ -183,7 +183,7 @@ runs:
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Store Allure test stat in the DB (new)
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}

View File

@@ -88,7 +88,7 @@ runs:
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
@@ -218,9 +218,6 @@ runs:
name: compatibility-snapshot-${{ runner.arch }}-${{ inputs.build_type }}-pg${{ inputs.pg_version }}
# Directory is created by test_compatibility.py::test_create_snapshot, keep the path in sync with the test
path: /tmp/test_output/compatibility_snapshot_pg${{ inputs.pg_version }}/
# The lack of compatibility snapshot shouldn't fail the job
# (for example if we didn't run the test for non build-and-test workflow)
skip-if-does-not-exist: true
- name: Upload test results
if: ${{ !cancelled() }}

View File

@@ -7,10 +7,6 @@ inputs:
path:
description: "A directory or file to upload"
required: true
skip-if-does-not-exist:
description: "Allow to skip if path doesn't exist, fail otherwise"
default: false
required: false
prefix:
description: "S3 prefix. Default is '${GITHUB_SHA}/${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
@@ -19,12 +15,10 @@ runs:
using: "composite"
steps:
- name: Prepare artifact
id: prepare-artifact
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
run: |
mkdir -p $(dirname $ARCHIVE)
@@ -39,22 +33,14 @@ runs:
elif [ -f ${SOURCE} ]; then
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
elif ! ls ${SOURCE} > /dev/null 2>&1; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
exit 0
else
echo >&2 "${SOURCE} does not exist"
exit 2
fi
echo >&2 "${SOURCE} does not exist"
exit 2
else
echo >&2 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
exit 3
fi
echo 'SKIPPED=false' >> $GITHUB_OUTPUT
- name: Upload artifact
if: ${{ steps.prepare-artifact.outputs.SKIPPED == 'false' }}
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}

View File

@@ -124,28 +124,28 @@ jobs:
uses: actions/cache@v4
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v4
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v4
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'

View File

@@ -19,16 +19,9 @@ defaults:
run:
shell: bash -euo pipefail {0}
# The initial idea was to prevent the waste of resources by not re-building the `build-tools` image
# for the same tag in parallel workflow runs, and queue them to be skipped once we have
# the first image pushed to Docker registry, but GitHub's concurrency mechanism is not working as expected.
# GitHub can't have more than 1 job in a queue and removes the previous one, it causes failures if the dependent jobs.
#
# Ref https://github.com/orgs/community/discussions/41518
#
# concurrency:
# group: build-build-tools-image-${{ inputs.image-tag }}
# cancel-in-progress: false
concurrency:
group: build-build-tools-image-${{ inputs.image-tag }}
cancel-in-progress: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -43,7 +36,6 @@ jobs:
strategy:
matrix:
debian-version: [ bullseye, bookworm ]
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
@@ -82,22 +74,22 @@ jobs:
- uses: docker/build-push-action@v6
with:
file: Dockerfile.build-tools
context: .
provenance: false
push: true
pull: true
build-args: |
DEBIAN_VERSION=${{ matrix.debian-version }}
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian-version }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian-version, matrix.arch) || '' }}
tags: |
neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.debian-version }}-${{ matrix.arch }}
file: Dockerfile.build-tools
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0},mode=max', matrix.arch) || '' }}
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
merge-images:
needs: [ build-image ]
runs-on: ubuntu-22.04
env:
IMAGE_TAG: ${{ inputs.image-tag }}
steps:
- uses: docker/login-action@v3
with:
@@ -105,17 +97,7 @@ jobs:
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Create multi-arch image
env:
DEFAULT_DEBIAN_VERSION: bullseye
IMAGE_TAG: ${{ inputs.image-tag }}
run: |
for debian_version in bullseye bookworm; do
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian_version}")
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
tags+=("-t" "neondatabase/build-tools:${IMAGE_TAG}")
fi
docker buildx imagetools create "${tags[@]}" \
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-x64 \
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-arm64
done
docker buildx imagetools create -t neondatabase/build-tools:${IMAGE_TAG} \
neondatabase/build-tools:${IMAGE_TAG}-x64 \
neondatabase/build-tools:${IMAGE_TAG}-arm64

View File

@@ -92,7 +92,7 @@ jobs:
needs: [ check-permissions, build-build-tools-image ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -106,7 +106,7 @@ jobs:
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
@@ -181,7 +181,7 @@ jobs:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -193,15 +193,16 @@ jobs:
with:
submodules: true
- name: Cache cargo deps
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
# Disabled for now
# - name: Restore cargo deps cache
# id: cache_cargo
# uses: actions/cache@v4
# with:
# path: |
# !~/.cargo/registry/src
# ~/.cargo/git/
# target/
# key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-clippy-${{ hashFiles('rust-toolchain.toml') }}-${{ hashFiles('Cargo.lock') }}
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers
@@ -261,7 +262,7 @@ jobs:
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
@@ -276,7 +277,7 @@ jobs:
needs: [ check-permissions, build-build-tools-image ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -289,7 +290,7 @@ jobs:
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
@@ -309,7 +310,7 @@ jobs:
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -367,7 +368,7 @@ jobs:
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -415,7 +416,7 @@ jobs:
needs: [ check-permissions, build-build-tools-image, build-and-test-locally ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -559,16 +560,15 @@ jobs:
ADDITIONAL_RUSTFLAGS=${{ matrix.arch == 'arm64' && '-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1' || '' }}
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
DEBIAN_VERSION=bookworm
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
provenance: false
push: true
pull: true
file: Dockerfile
cache-from: type=registry,ref=cache.neon.build/neon:cache-bookworm-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0}-{1},mode=max', 'bookworm', matrix.arch) || '' }}
cache-from: type=registry,ref=cache.neon.build/neon:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0},mode=max', matrix.arch) || '' }}
tags: |
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-${{ matrix.arch }}
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
neon-image:
needs: [ neon-image-arch, tag ]
@@ -583,9 +583,8 @@ jobs:
- name: Create multi-arch image
run: |
docker buildx imagetools create -t neondatabase/neon:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm \
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-x64 \
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-arm64
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-arm64
- uses: docker/login-action@v3
with:
@@ -606,16 +605,17 @@ jobs:
version:
# Much data was already generated on old PG versions with bullseye's
# libraries, the locales of which can cause data incompatibilities.
# However, new PG versions should be build on newer images,
# as that reduces the support burden of old and ancient distros.
# However, new PG versions should check if they can be built on newer
# images, as that reduces the support burden of old and ancient
# distros.
- pg: v14
debian: bullseye
debian: bullseye-slim
- pg: v15
debian: bullseye
debian: bullseye-slim
- pg: v16
debian: bullseye
debian: bullseye-slim
- pg: v17
debian: bookworm
debian: bookworm-slim
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
@@ -660,16 +660,16 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
tags: |
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version.pg == 'v16'
@@ -680,17 +680,17 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
target: neon-pg-ext-test
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
tags: |
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
@@ -705,16 +705,14 @@ jobs:
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-tools-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
tags: |
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
compute-node-image:
needs: [ compute-node-image-arch, tag ]
@@ -722,16 +720,7 @@ jobs:
strategy:
matrix:
version:
# see the comment for `compute-node-image-arch` job
- pg: v14
debian: bullseye
- pg: v15
debian: bullseye
- pg: v16
debian: bullseye
- pg: v17
debian: bookworm
version: [ v14, v15, v16, v17 ]
steps:
- uses: docker/login-action@v3
@@ -741,26 +730,23 @@ jobs:
- name: Create multi-arch compute-node image
run: |
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
- name: Create multi-arch neon-test-extensions image
if: matrix.version.pg == 'v16'
if: matrix.version == 'v16'
run: |
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
- name: Create multi-arch compute-tools image
if: matrix.version.pg == 'v16'
if: matrix.version == 'v17'
run: |
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-arm64
- uses: docker/login-action@v3
with:
@@ -768,13 +754,13 @@ jobs:
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Push multi-arch compute-node-${{ matrix.version.pg }} image to ECR
- name: Push multi-arch compute-node-${{ matrix.version }} image to ECR
run: |
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
- name: Push multi-arch compute-tools image to ECR
if: matrix.version.pg == 'v16'
if: matrix.version == 'v17'
run: |
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
@@ -785,16 +771,7 @@ jobs:
strategy:
fail-fast: false
matrix:
version:
# see the comment for `compute-node-image-arch` job
- pg: v14
debian: bullseye
- pg: v15
debian: bullseye
- pg: v16
debian: bullseye
- pg: v17
debian: bookworm
version: [ v14, v15, v16, v17 ]
env:
VM_BUILDER_VERSION: v0.35.0
@@ -816,18 +793,18 @@ jobs:
# it won't have the proper authentication (written at v0.6.0)
- name: Pulling compute-node image
run: |
docker pull neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
docker pull neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
- name: Build vm image
run: |
./vm-builder \
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
-spec=compute/vm-image-spec.yaml \
-src=neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
- name: Pushing vm-compute-node image
run: |
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
docker push neondatabase/vm-compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
test-images:
needs: [ check-permissions, tag, neon-image, compute-node-image ]

View File

@@ -155,7 +155,7 @@ jobs:
github.ref_name == 'main'
runs-on: [ self-hosted, large ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}

View File

@@ -55,7 +55,7 @@ jobs:
runs-on: ubuntu-22.04
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -150,7 +150,7 @@ jobs:
runs-on: ubuntu-22.04
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}

View File

@@ -71,6 +71,7 @@ jobs:
steps:
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
@@ -93,22 +94,8 @@ jobs:
az acr login --name=neoneastus2
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
env:
DEFAULT_DEBIAN_VERSION: bullseye
run: |
for debian_version in bullseye bookworm; do
tags=()
tags+=("-t" "neondatabase/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}-${debian_version}")
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
tags+=("-t" "neondatabase/build-tools:${TO_TAG}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}")
fi
docker buildx imagetools create "${tags[@]}" \
neondatabase/build-tools:${FROM_TAG}-${debian_version}
done
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG} \
-t neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG} \
-t neondatabase/build-tools:${TO_TAG} \
neondatabase/build-tools:${FROM_TAG}

View File

@@ -33,7 +33,7 @@ jobs:
actions: read
steps:
- name: Export GH Workflow Stats
uses: neondatabase/gh-workflow-stats-action@v0.1.4
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
with:
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
DB_TABLE: "gh_workflow_stats_neon"

View File

@@ -1,6 +1,5 @@
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/storage_controller @neondatabase/storage
/storage_scrubber @neondatabase/storage
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/libs/remote_storage/ @neondatabase/storage

18
Cargo.lock generated
View File

@@ -1820,7 +1820,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47"
dependencies = [
"base16ct 0.2.0",
"base64ct",
"crypto-bigint 0.5.5",
"digest",
"ff 0.13.0",
@@ -1830,8 +1829,6 @@ dependencies = [
"pkcs8 0.10.2",
"rand_core 0.6.4",
"sec1 0.7.3",
"serde_json",
"serdect",
"subtle",
"zeroize",
]
@@ -4040,8 +4037,6 @@ dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"serde",
"serde_json",
]
[[package]]
@@ -5261,7 +5256,6 @@ dependencies = [
"der 0.7.8",
"generic-array",
"pkcs8 0.10.2",
"serdect",
"subtle",
"zeroize",
]
@@ -5516,16 +5510,6 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "serdect"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a84f14a19e9a014bb9f4512488d9829a68e04ecabffb0f9904cd1ace94598177"
dependencies = [
"base16ct 0.2.0",
"serde",
]
[[package]]
name = "sha1"
version = "0.10.5"
@@ -7318,7 +7302,6 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"postgres-types",
"prettyplease",
"proc-macro2",
"prost",
@@ -7343,7 +7326,6 @@ dependencies = [
"time",
"time-macros",
"tokio",
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit",

View File

@@ -7,8 +7,6 @@ ARG IMAGE=build-tools
ARG TAG=pinned
ARG DEFAULT_PG_VERSION=17
ARG STABLE_PG_VERSION=16
ARG DEBIAN_VERSION=bullseye
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
# Build Postgres
FROM $REPOSITORY/$IMAGE:$TAG AS pg-build
@@ -59,7 +57,7 @@ RUN set -e \
# Build final image
#
FROM debian:${DEBIAN_FLAVOR}
FROM debian:bullseye-slim
ARG DEFAULT_PG_VERSION
WORKDIR /data

View File

@@ -1,7 +1,12 @@
ARG DEBIAN_VERSION=bullseye
FROM debian:bullseye-slim
FROM debian:${DEBIAN_VERSION}-slim
ARG DEBIAN_VERSION
# Use ARG as a build-time environment variable here to allow.
# It's not supposed to be set outside.
# Alternatively it can be obtained using the following command
# ```
# . /etc/os-release && echo "${VERSION_CODENAME}"
# ```
ARG DEBIAN_VERSION_CODENAME=bullseye
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
@@ -37,14 +42,14 @@ RUN set -e \
libseccomp-dev \
libsqlite3-dev \
libssl-dev \
$([[ "${DEBIAN_VERSION}" = "bullseye" ]] && libstdc++-10-dev || libstdc++-11-dev) \
libstdc++-10-dev \
libtool \
libxml2-dev \
libxmlsec1-dev \
libxxhash-dev \
lsof \
make \
netcat-openbsd \
netcat \
net-tools \
openssh-client \
parallel \
@@ -73,7 +78,7 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
# LLVM
ENV LLVM_VERSION=18
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION_CODENAME}/ llvm-toolchain-${DEBIAN_VERSION_CODENAME}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
@@ -81,7 +86,7 @@ RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION_CODENAME} stable" > /etc/apt/sources.list.d/docker.list \
&& apt update \
&& apt install -y docker-ce docker-ce-cli \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

View File

@@ -3,8 +3,7 @@ ARG REPOSITORY=neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
ARG DEBIAN_VERSION=bullseye
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
ARG DEBIAN_FLAVOR=bullseye-slim
#########################################################################################
#
@@ -12,23 +11,20 @@ ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS build-deps
ARG DEBIAN_VERSION
ARG DEBIAN_FLAVOR
RUN case $DEBIAN_VERSION in \
RUN case $DEBIAN_FLAVOR in \
# Version-specific installs for Bullseye (PG14-PG16):
# The h3_pg extension needs a cmake 3.20+, but Debian bullseye has 3.18.
# Install newer version (3.25) from backports.
bullseye) \
bullseye*) \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports"; \
;; \
# Version-specific installs for Bookworm (PG17):
bookworm) \
bookworm*) \
VERSION_INSTALLS="cmake"; \
;; \
*) \
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
;; \
esac && \
apt update && \
apt install --no-install-recommends -y git autoconf automake libtool build-essential bison flex libreadline-dev \
@@ -113,30 +109,13 @@ RUN apt update && \
libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \
protobuf-c-compiler xsltproc
# Postgis 3.5.0 requires SFCGAL 1.4+
#
# It would be nice to update all versions together, but we must solve the SFCGAL dependency first.
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
# and also we must check backward compatibility with older versions of PostGIS.
#
# Use new version only for v17
RUN case "${PG_VERSION}" in \
"v17") \
export SFCGAL_VERSION=1.4.1 \
export SFCGAL_CHECKSUM=1800c8a26241588f11cddcf433049e9b9aea902e923414d2ecef33a3295626c3 \
;; \
"v14" | "v15" | "v16") \
export SFCGAL_VERSION=1.3.10 \
export SFCGAL_CHECKSUM=4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /sfcgal && \
wget https://gitlab.com/sfcgal/SFCGAL/-/archive/v${SFCGAL_VERSION}/SFCGAL-v${SFCGAL_VERSION}.tar.gz -O SFCGAL.tar.gz && \
echo "${SFCGAL_CHECKSUM} SFCGAL.tar.gz" | sha256sum --check && \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
esac && \
wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
@@ -144,27 +123,15 @@ RUN case "${PG_VERSION}" in \
ENV PATH="/usr/local/pgsql/bin:$PATH"
# Postgis 3.5.0 supports v17
RUN case "${PG_VERSION}" in \
"v17") \
export POSTGIS_VERSION=3.5.0 \
export POSTGIS_CHECKSUM=ca698a22cc2b2b3467ac4e063b43a28413f3004ddd505bdccdd74c56a647f510 \
;; \
"v14" | "v15" | "v16") \
export POSTGIS_VERSION=3.3.3 \
export POSTGIS_CHECKSUM=74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
esac && \
wget https://download.osgeo.org/postgis/source/postgis-${POSTGIS_VERSION}.tar.gz -O postgis.tar.gz && \
echo "${POSTGIS_CHECKSUM} postgis.tar.gz" | sha256sum --check && \
wget https://download.osgeo.org/postgis/source/postgis-3.3.3.tar.gz -O postgis.tar.gz && \
echo "74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
cd extensions/postgis && \
make clean && \
@@ -185,27 +152,11 @@ RUN case "${PG_VERSION}" in \
cp /usr/local/pgsql/share/extension/address_standardizer.control /extensions/postgis && \
cp /usr/local/pgsql/share/extension/address_standardizer_data_us.control /extensions/postgis
# Uses versioned libraries, i.e. libpgrouting-3.4
# and may introduce function signature changes between releases
# i.e. release 3.5.0 has new signature for pg_dijkstra function
#
# Use new version only for v17
# last release v3.6.2 - Mar 30, 2024
RUN case "${PG_VERSION}" in \
"v17") \
export PGROUTING_VERSION=3.6.2 \
export PGROUTING_CHECKSUM=f4a1ed79d6f714e52548eca3bb8e5593c6745f1bde92eb5fb858efd8984dffa2 \
;; \
"v14" | "v15" | "v16") \
export PGROUTING_VERSION=3.4.2 \
export PGROUTING_CHECKSUM=cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/pgRouting/pgrouting/archive/v${PGROUTING_VERSION}.tar.gz -O pgrouting.tar.gz && \
echo "${PGROUTING_CHECKSUM} pgrouting.tar.gz" | sha256sum --check && \
wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
cmake -DCMAKE_BUILD_TYPE=Release .. && \
@@ -264,9 +215,10 @@ FROM build-deps AS h3-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v4.1.0 - Jan 18, 2023
RUN mkdir -p /h3/usr/ && \
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /h3/usr/ && \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \
@@ -277,9 +229,10 @@ RUN mkdir -p /h3/usr/ && \
cp -R /h3/usr / && \
rm -rf build
# not version-specific
# last release v4.1.3 - Jul 26, 2023
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
echo "5c17f09a820859ffe949f847bebf1be98511fb8f1bd86f94932512c00479e324 h3-pg.tar.gz" | sha256sum --check && \
mkdir h3-pg-src && cd h3-pg-src && tar xzf ../h3-pg.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
@@ -298,10 +251,11 @@ FROM build-deps AS unit-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release 7.9 - Sep 15, 2024
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.9.tar.gz -O postgresql-unit.tar.gz && \
echo "e46de6245dcc8b2c2ecf29873dbd43b2b346773f31dd5ce4b8315895a052b456 postgresql-unit.tar.gz" | sha256sum --check && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -348,10 +302,12 @@ FROM build-deps AS pgjwt-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# doesn't use releases, last commit f3d82fd - Mar 2, 2023
RUN wget https://github.com/michelp/pgjwt/archive/f3d82fd30151e754e19ce5d6a06c71c20689ce3d.tar.gz -O pgjwt.tar.gz && \
echo "dae8ed99eebb7593b43013f6532d772b12dfecd55548d2673f2dfd0163f6d2b9 pgjwt.tar.gz" | sha256sum --check && \
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
mkdir pgjwt-src && cd pgjwt-src && tar xzf ../pgjwt.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
@@ -386,9 +342,10 @@ FROM build-deps AS pg-hashids-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v1.2.1 -Jan 12, 2018
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -448,9 +405,10 @@ FROM build-deps AS ip4r-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v2.4.2 - Jul 29, 2023
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
echo "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b ip4r.tar.gz" | sha256sum --check && \
mkdir ip4r-src && cd ip4r-src && tar xzf ../ip4r.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -467,9 +425,10 @@ FROM build-deps AS prefix-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v1.2.10 - Jul 5, 2023
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
echo "4342f251432a5f6fb05b8597139d3ccde8dcf87e8ca1498e7ee931ca057a8575 prefix.tar.gz" | sha256sum --check && \
mkdir prefix-src && cd prefix-src && tar xzf ../prefix.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -486,9 +445,10 @@ FROM build-deps AS hll-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v2.18 - Aug 29, 2023
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
echo "e2f55a6f4c4ab95ee4f1b4a2b73280258c5136b161fe9d059559556079694f0e hll.tar.gz" | sha256sum --check && \
mkdir hll-src && cd hll-src && tar xzf ../hll.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -699,10 +659,11 @@ FROM build-deps AS pg-roaringbitmap-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v0.5.4 - Jun 28, 2022
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions is not supported yet by pg_roaringbitmap. Quit" && exit 0;; \
esac && \
wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -719,27 +680,12 @@ FROM build-deps AS pg-semver-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# Release 0.40.0 breaks backward compatibility with previous versions
# see release note https://github.com/theory/pg-semver/releases/tag/v0.40.0
# Use new version only for v17
#
# last release v0.40.0 - Jul 22, 2024
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in \
"v17") \
export SEMVER_VERSION=0.40.0 \
export SEMVER_CHECKSUM=3e50bcc29a0e2e481e7b6d2bc937cadc5f5869f55d983b5a1aafeb49f5425cfc \
;; \
"v14" | "v15" | "v16") \
export SEMVER_VERSION=0.32.1 \
export SEMVER_CHECKSUM=fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pg_semver. Quit" && exit 0;; \
esac && \
wget https://github.com/theory/pg-semver/archive/refs/tags/v${SEMVER_VERSION}.tar.gz -O pg_semver.tar.gz && \
echo "${SEMVER_CHECKSUM} pg_semver.tar.gz" | sha256sum --check && \
wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
mkdir pg_semver-src && cd pg_semver-src && tar xzf ../pg_semver.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -1095,6 +1041,7 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
ARG DEBIAN_FLAVOR
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
@@ -1105,6 +1052,7 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compu
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
ARG DEBIAN_FLAVOR
RUN set -e \
&& apt-get update \
&& apt-get install --no-install-recommends -y \
@@ -1259,7 +1207,7 @@ ENV PGDATABASE=postgres
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR
ARG DEBIAN_VERSION
ARG DEBIAN_FLAVOR
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \
@@ -1307,22 +1255,19 @@ RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/loca
RUN apt update && \
case $DEBIAN_VERSION in \
case $DEBIAN_FLAVOR in \
# Version-specific installs for Bullseye (PG14-PG16):
# libicu67, locales for collations (including ICU and plpgsql_check)
# libgdal28, libproj19 for PostGIS
bullseye) \
bullseye*) \
VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \
;; \
# Version-specific installs for Bookworm (PG17):
# libicu72, locales for collations (including ICU and plpgsql_check)
# libgdal32, libproj25 for PostGIS
bookworm) \
bookworm*) \
VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \
;; \
*) \
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
;; \
esac && \
apt install --no-install-recommends -y \
gdb \

View File

@@ -1,126 +0,0 @@
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
---
commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
# restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for
# running it as root.
- name: chmod-resize-swap
user: root
sysvInitAction: sysinit
shell: 'chmod 711 /neonvm/bin/resize-swap'
- name: chmod-set-disk-quota
user: root
sysvInitAction: sysinit
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
- name: pgbouncer
user: postgres
sysvInitAction: respawn
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: '/usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
- name: sql-exporter
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml -web.listen-address=:9399'
- name: sql-exporter-autoscaling
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
- filename: compute_ctl-sudoers
content: |
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = postgres;
}
task {
gid = users;
}
}
memory {}
}
build: |
# Build cgroup-tools
#
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
# requires cgroup v2, so we'll build cgroup-tools ourselves.
#
# At time of migration to bookworm (2024-10-09), debian has a version of libcgroup/cgroup-tools 2.0.2,
# and it _probably_ can be used as-is. However, we'll build it ourselves to minimise the changeset
# for debian version migration.
#
FROM debian:bookworm-slim as libcgroup-builder
ENV LIBCGROUP_VERSION=v2.0.3
RUN set -exu \
&& apt update \
&& apt install --no-install-recommends -y \
git \
ca-certificates \
automake \
cmake \
make \
gcc \
byacc \
flex \
libtool \
libpam0g-dev \
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
&& INSTALL_DIR="/libcgroup-install" \
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
&& cd libcgroup \
# extracted from bootstrap.sh, with modified flags:
&& (test -d m4 || mkdir m4) \
&& autoreconf -fi \
&& rm -rf autom4te.cache \
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
# actually build the thing...
&& make install
merge: |
# tweak nofile limits
RUN set -e \
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
&& test ! -e /etc/security || ( \
echo '* - nofile 1048576' >>/etc/security/limits.conf \
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
)
# Allow postgres user (compute_ctl) to run swap resizer.
# Need to install sudo in order to allow this.
#
# Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe.
RUN set -e \
&& apt update \
&& apt install --no-install-recommends -y \
sudo \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
COPY compute_ctl-sudoers /etc/sudoers.d/compute_ctl-sudoers
COPY cgconfig.conf /etc/cgconfig.conf
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -97,21 +97,7 @@ impl ComputeControlPlane {
for endpoint_dir in std::fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
let ep = match ep_res {
Ok(ep) => ep,
Err(e) => match e.downcast::<std::io::Error>() {
Ok(e) => {
// A parallel task could delete an endpoint while we have just scanned the directory
if e.kind() == std::io::ErrorKind::NotFound {
continue;
} else {
Err(e)?
}
}
Err(e) => Err(e)?,
},
};
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
}

View File

@@ -31,12 +31,9 @@ pub enum Scope {
/// The scope used by pageservers in upcalls to storage controller and cloud control plane
#[serde(rename = "generations_api")]
GenerationsApi,
/// Allows access to control plane managment API and all storage controller endpoints.
/// Allows access to control plane managment API and some storage controller endpoints.
Admin,
/// Allows access to control plane & storage controller endpoints used in infrastructure automation (e.g. node registration)
Infra,
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state
/// of a tenant & post scrub results.
Scrubber,

View File

@@ -28,9 +28,6 @@ pub enum ApiError {
#[error("Resource temporarily unavailable: {0}")]
ResourceUnavailable(Cow<'static, str>),
#[error("Too many requests: {0}")]
TooManyRequests(Cow<'static, str>),
#[error("Shutting down")]
ShuttingDown,
@@ -76,10 +73,6 @@ impl ApiError {
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::TOO_MANY_REQUESTS,
),
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::REQUEST_TIMEOUT,

View File

@@ -14,19 +14,14 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
}
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
(
Scope::Admin
| Scope::SafekeeperData
| Scope::GenerationsApi
| Scope::Infra
| Scope::Scrubber,
_,
) => Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Pageserver auth",
claims.scope
)
.into(),
)),
(Scope::Admin | Scope::SafekeeperData | Scope::GenerationsApi | Scope::Scrubber, _) => {
Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Pageserver auth",
claims.scope
)
.into(),
))
}
}
}

View File

@@ -715,8 +715,6 @@ async fn timeline_archival_config_handler(
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
.await?;

View File

@@ -493,8 +493,6 @@ pub struct OffloadedTimeline {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub ancestor_timeline_id: Option<TimelineId>,
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
// TODO: once we persist offloaded state, make this lazily constructed
pub remote_client: Arc<RemoteTimelineClient>,
@@ -506,14 +504,10 @@ pub struct OffloadedTimeline {
impl OffloadedTimeline {
fn from_timeline(timeline: &Timeline) -> Self {
let ancestor_retain_lsn = timeline
.get_ancestor_timeline_id()
.map(|_timeline_id| timeline.get_ancestor_lsn());
Self {
tenant_shard_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_retain_lsn,
remote_client: timeline.remote_client.clone(),
delete_progress: timeline.delete_progress.clone(),
@@ -521,12 +515,6 @@ impl OffloadedTimeline {
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum MaybeOffloaded {
Yes,
No,
}
#[derive(Clone)]
pub enum TimelineOrOffloaded {
Timeline(Arc<Timeline>),
@@ -2265,13 +2253,12 @@ impl Tenant {
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
let timelines_to_activate = timelines_accessor
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor);
self.initialize_gc_info(&timelines_accessor);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
@@ -3311,7 +3298,6 @@ impl Tenant {
fn initialize_gc_info(
&self,
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
timelines_offloaded: &std::sync::MutexGuard<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
) {
// This function must be called before activation: after activation timeline create/delete operations
// might happen, and this function is not safe to run concurrently with those.
@@ -3319,37 +3305,20 @@ impl Tenant {
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, MaybeOffloaded)>> =
BTreeMap::new();
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId)>> = BTreeMap::new();
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((
timeline_entry.get_ancestor_lsn(),
*timeline_id,
MaybeOffloaded::No,
));
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id));
}
});
timelines_offloaded
.iter()
.for_each(|(timeline_id, timeline_entry)| {
let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id else {
return;
};
let Some(retain_lsn) = timeline_entry.ancestor_retain_lsn else {
return;
};
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((retain_lsn, *timeline_id, MaybeOffloaded::Yes));
});
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
let horizon = self.get_gc_horizon();
// Populate each timeline's GcInfo with information about its child branches
for timeline in timelines.values() {
let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints
let mut branchpoints: Vec<(Lsn, TimelineId)> = all_branchpoints
.remove(&timeline.timeline_id)
.unwrap_or_default();
@@ -4909,10 +4878,7 @@ mod tests {
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(
branchpoints[0],
(Lsn(0x40), NEW_TIMELINE_ID, MaybeOffloaded::No)
);
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID));
}
// You can read the key from the child branch even though the parent is
@@ -8295,8 +8261,8 @@ mod tests {
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x10), tline.timeline_id),
(Lsn(0x20), tline.timeline_id),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
@@ -8523,8 +8489,8 @@ mod tests {
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x10), tline.timeline_id),
(Lsn(0x20), tline.timeline_id),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
@@ -8757,7 +8723,7 @@ mod tests {
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
space: Lsn(0x10),
@@ -8771,7 +8737,7 @@ mod tests {
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
space: Lsn(0x50),

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use crate::tenant::{MaybeOffloaded, Timeline};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -264,12 +264,10 @@ pub(super) async fn gather_inputs(
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
.retain_lsns
.iter()
.filter(|(lsn, _child_id, is_offloaded)| {
lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
})
.filter(|(lsn, _child_id)| lsn > &ancestor_lsn)
.copied()
// this assumes there are no other retain_lsns than the branchpoints
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.map(|(lsn, _child_id)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));

View File

@@ -392,10 +392,6 @@ impl InMemoryLayer {
self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
}
pub(crate) fn start_lsn(&self) -> Lsn {
self.start_lsn
}
pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
self.start_lsn..self.end_lsn_or_max()
}

View File

@@ -139,10 +139,8 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{
config::TenantConf,
storage_layer::{inmemory_layer, LayerVisibilityHint},
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
@@ -452,7 +450,7 @@ pub(crate) struct GcInfo {
/// Currently, this includes all points where child branches have
/// been forked off from. In the future, could also include
/// explicit user-defined snapshot points.
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, MaybeOffloaded)>,
pub(crate) retain_lsns: Vec<(Lsn, TimelineId)>,
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
@@ -469,13 +467,8 @@ impl GcInfo {
self.cutoffs.select_min()
}
pub(super) fn insert_child(
&mut self,
child_id: TimelineId,
child_lsn: Lsn,
is_offloaded: MaybeOffloaded,
) {
self.retain_lsns.push((child_lsn, child_id, is_offloaded));
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
self.retain_lsns.push((child_lsn, child_id));
self.retain_lsns.sort_by_key(|i| i.0);
}
@@ -2171,9 +2164,7 @@ impl Timeline {
if let Some(ancestor) = &ancestor {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
// If we construct an explicit timeline object, it's obviously not offloaded
let is_offloaded = MaybeOffloaded::No;
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn());
}
Arc::new_cyclic(|myself| {
@@ -4884,7 +4875,7 @@ impl Timeline {
let retain_lsns = gc_info
.retain_lsns
.iter()
.map(|(lsn, _child_id, _is_offloaded)| *lsn)
.map(|(lsn, _child_id)| *lsn)
.collect();
// Gets the maximum LSN that holds the valid lease.

View File

@@ -42,7 +42,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
@@ -639,28 +639,10 @@ impl Timeline {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id, is_offloaded) in &children {
if *is_offloaded == MaybeOffloaded::Yes {
continue;
}
for (child_lsn, _child_timeline_id) in &children {
readable_points.push(*child_lsn);
}
readable_points.push(head_lsn);
// The Timeline get page process will walk all InMemoryLayers before it starts walking historic
// layers. That means it might fail to see image layers that overlap with the LSN range of
// InMemoryLayers, so there is a de-facto read point at the start_lsn of the oldest InMemoryLayer.
//
// This behavior in the getpage path is considered a but, and including InMemoryLayer's start_lsn here
// is a workaround. See https://github.com/neondatabase/neon/issues/9185
if let Some(oldest_inmemory_layer) = layer_map.frozen_layers.front() {
readable_points.push(oldest_inmemory_layer.start_lsn())
} else if let Some(open_layer) = layer_map.open_layer.as_ref() {
readable_points.push(open_layer.start_lsn());
}
readable_points.sort();
readable_points
};
@@ -1759,7 +1741,7 @@ impl Timeline {
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
for (lsn, _timeline_id) in &gc_info.retain_lsns {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
}

View File

@@ -43,7 +43,6 @@
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -115,9 +114,7 @@ typedef struct FileCacheControl
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
uint64 writes; /* number of writes issued */
uint64 time_read; /* time spent reading (us) */
uint64 time_write; /* time spent writing (us) */
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
@@ -273,8 +270,6 @@ lfc_shmem_startup(void)
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
lfc_ctl->time_read = 0;
lfc_ctl->time_write = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -706,7 +701,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
@@ -801,13 +795,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_ctl->misses += iteration_misses;
pgBufferUsage.file_cache.hits += iteration_hits;
pgBufferUsage.file_cache.misses += iteration_misses;
if (iteration_hits)
{
lfc_ctl->time_read += io_time_us;
inc_page_cache_read_wait(io_time_us);
}
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
@@ -872,7 +859,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
instr_time io_start, io_end;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
@@ -961,13 +947,12 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
generation = lfc_ctl->generation;
entry_offset = entry->offset;
lfc_ctl->writes += blocks_in_chunk;
LWLockRelease(lfc_lock);
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
if (rc != BLCKSZ * blocks_in_chunk)
@@ -980,17 +965,9 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (lfc_ctl->generation == generation)
{
uint64 time_spent_us;
CriticalAssert(LFC_ENABLED());
/* Place entry to the head of LRU list */
CriticalAssert(entry->access_count > 0);
lfc_ctl->writes += blocks_in_chunk;
INSTR_TIME_SUBTRACT(io_start, io_end);
time_spent_us = INSTR_TIME_GET_MICROSEC(io_start);
lfc_ctl->time_write += time_spent_us;
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);

View File

@@ -50,52 +50,28 @@ NeonPerfCountersShmemInit(void)
}
}
static inline void
inc_iohist(IOHistogram hist, uint64 latency_us)
/*
* Count a GetPage wait operation.
*/
void
inc_getpage_wait(uint64 latency_us)
{
int lo = 0;
int hi = NUM_IO_WAIT_BUCKETS - 1;
int hi = NUM_GETPAGE_WAIT_BUCKETS - 1;
/* Find the right bucket with binary search */
while (lo < hi)
{
int mid = (lo + hi) / 2;
if (latency_us < io_wait_bucket_thresholds[mid])
if (latency_us < getpage_wait_bucket_thresholds[mid])
hi = mid;
else
lo = mid + 1;
}
hist->wait_us_bucket[lo]++;
hist->wait_us_sum += latency_us;
hist->wait_us_count++;
}
/*
* Count a GetPage wait operation.
*/
void
inc_getpage_wait(uint64 latency)
{
inc_iohist(&MyNeonCounters->getpage_hist, latency);
}
/*
* Count an LFC read wait operation.
*/
void
inc_page_cache_read_wait(uint64 latency)
{
inc_iohist(&MyNeonCounters->file_cache_read_hist, latency);
}
/*
* Count an LFC write wait operation.
*/
void
inc_page_cache_write_wait(uint64 latency)
{
inc_iohist(&MyNeonCounters->file_cache_write_hist, latency);
MyNeonCounters->getpage_wait_us_bucket[lo]++;
MyNeonCounters->getpage_wait_us_sum += latency_us;
MyNeonCounters->getpage_wait_us_count++;
}
/*
@@ -105,91 +81,77 @@ inc_page_cache_write_wait(uint64 latency)
typedef struct
{
const char *name;
char *name;
bool is_bucket;
double bucket_le;
double value;
} metric_t;
static int
histogram_to_metrics(IOHistogram histogram,
metric_t *metrics,
const char *count,
const char *sum,
const char *bucket)
static metric_t *
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
{
int i = 0;
uint64 bucket_accum = 0;
#define NUM_METRICS (2 + NUM_GETPAGE_WAIT_BUCKETS + 8)
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
uint64 bucket_accum;
int i = 0;
metrics[i].name = count;
metrics[i].name = "getpage_wait_seconds_count";
metrics[i].is_bucket = false;
metrics[i].value = (double) histogram->wait_us_count;
metrics[i].value = (double) counters->getpage_wait_us_count;
i++;
metrics[i].name = sum;
metrics[i].name = "getpage_wait_seconds_sum";
metrics[i].is_bucket = false;
metrics[i].value = (double) histogram->wait_us_sum / 1000000.0;
metrics[i].value = ((double) counters->getpage_wait_us_sum) / 1000000.0;
i++;
for (int bucketno = 0; bucketno < NUM_IO_WAIT_BUCKETS; bucketno++)
bucket_accum = 0;
for (int bucketno = 0; bucketno < NUM_GETPAGE_WAIT_BUCKETS; bucketno++)
{
uint64 threshold = io_wait_bucket_thresholds[bucketno];
uint64 threshold = getpage_wait_bucket_thresholds[bucketno];
bucket_accum += histogram->wait_us_bucket[bucketno];
bucket_accum += counters->getpage_wait_us_bucket[bucketno];
metrics[i].name = bucket;
metrics[i].name = "getpage_wait_seconds_bucket";
metrics[i].is_bucket = true;
metrics[i].bucket_le = (threshold == UINT64_MAX) ? INFINITY : ((double) threshold) / 1000000.0;
metrics[i].value = (double) bucket_accum;
i++;
}
return i;
}
static metric_t *
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
{
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + 10)
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
int i = 0;
#define APPEND_METRIC(_name) do { \
metrics[i].name = #_name; \
metrics[i].is_bucket = false; \
metrics[i].value = (double) counters->_name; \
i++; \
} while (false)
i += histogram_to_metrics(&counters->getpage_hist, &metrics[i],
"getpage_wait_seconds_count",
"getpage_wait_seconds_sum",
"getpage_wait_seconds_bucket");
APPEND_METRIC(getpage_prefetch_requests_total);
APPEND_METRIC(getpage_sync_requests_total);
APPEND_METRIC(getpage_prefetch_misses_total);
APPEND_METRIC(getpage_prefetch_discards_total);
APPEND_METRIC(pageserver_requests_sent_total);
APPEND_METRIC(pageserver_disconnects_total);
APPEND_METRIC(pageserver_send_flushes_total);
APPEND_METRIC(pageserver_open_requests);
APPEND_METRIC(getpage_prefetches_buffered);
APPEND_METRIC(file_cache_hits_total);
i += histogram_to_metrics(&counters->file_cache_read_hist, &metrics[i],
"file_cache_read_wait_seconds_count",
"file_cache_read_wait_seconds_sum",
"file_cache_read_wait_seconds_bucket");
i += histogram_to_metrics(&counters->file_cache_write_hist, &metrics[i],
"file_cache_write_wait_seconds_count",
"file_cache_write_wait_seconds_sum",
"file_cache_write_wait_seconds_bucket");
metrics[i].name = "getpage_prefetch_requests_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->getpage_prefetch_requests_total;
i++;
metrics[i].name = "getpage_sync_requests_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->getpage_sync_requests_total;
i++;
metrics[i].name = "getpage_prefetch_misses_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->getpage_prefetch_misses_total;
i++;
metrics[i].name = "getpage_prefetch_discards_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->getpage_prefetch_discards_total;
i++;
metrics[i].name = "pageserver_requests_sent_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->pageserver_requests_sent_total;
i++;
metrics[i].name = "pageserver_disconnects_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->pageserver_disconnects_total;
i++;
metrics[i].name = "pageserver_send_flushes_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->pageserver_send_flushes_total;
i++;
metrics[i].name = "file_cache_hits_total";
metrics[i].is_bucket = false;
metrics[i].value = (double) counters->file_cache_hits_total;
i++;
Assert(i == NUM_METRICS);
#undef APPEND_METRIC
#undef NUM_METRICS
/* NULL entry marks end of array */
metrics[i].name = NULL;
metrics[i].value = 0;
@@ -254,15 +216,6 @@ neon_get_backend_perf_counters(PG_FUNCTION_ARGS)
return (Datum) 0;
}
static inline void
histogram_merge_into(IOHistogram into, IOHistogram from)
{
into->wait_us_count += from->wait_us_count;
into->wait_us_sum += from->wait_us_sum;
for (int bucketno = 0; bucketno < NUM_IO_WAIT_BUCKETS; bucketno++)
into->wait_us_bucket[bucketno] += from->wait_us_bucket[bucketno];
}
PG_FUNCTION_INFO_V1(neon_get_perf_counters);
Datum
neon_get_perf_counters(PG_FUNCTION_ARGS)
@@ -281,7 +234,10 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
{
neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno];
histogram_merge_into(&totals.getpage_hist, &counters->getpage_hist);
totals.getpage_wait_us_count += counters->getpage_wait_us_count;
totals.getpage_wait_us_sum += counters->getpage_wait_us_sum;
for (int bucketno = 0; bucketno < NUM_GETPAGE_WAIT_BUCKETS; bucketno++)
totals.getpage_wait_us_bucket[bucketno] += counters->getpage_wait_us_bucket[bucketno];
totals.getpage_prefetch_requests_total += counters->getpage_prefetch_requests_total;
totals.getpage_sync_requests_total += counters->getpage_sync_requests_total;
totals.getpage_prefetch_misses_total += counters->getpage_prefetch_misses_total;
@@ -289,11 +245,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
totals.pageserver_requests_sent_total += counters->pageserver_requests_sent_total;
totals.pageserver_disconnects_total += counters->pageserver_disconnects_total;
totals.pageserver_send_flushes_total += counters->pageserver_send_flushes_total;
totals.pageserver_open_requests += counters->pageserver_open_requests;
totals.getpage_prefetches_buffered += counters->getpage_prefetches_buffered;
totals.file_cache_hits_total += counters->file_cache_hits_total;
histogram_merge_into(&totals.file_cache_read_hist, &counters->file_cache_read_hist);
histogram_merge_into(&totals.file_cache_write_hist, &counters->file_cache_write_hist);
}
metrics = neon_perf_counters_to_metrics(&totals);

View File

@@ -15,26 +15,17 @@
#include "storage/proc.h"
#endif
static const uint64 io_wait_bucket_thresholds[] = {
2, 3, 6, 10, /* 0 us - 10 us */
20, 30, 60, 100, /* 10 us - 100 us */
static const uint64 getpage_wait_bucket_thresholds[] = {
20, 30, 60, 100, /* 0 - 100 us */
200, 300, 600, 1000, /* 100 us - 1 ms */
2000, 3000, 6000, 10000, /* 1 ms - 10 ms */
20000, 30000, 60000, 100000, /* 10 ms - 100 ms */
200000, 300000, 600000, 1000000, /* 100 ms - 1 s */
2000000, 3000000, 6000000, 10000000, /* 1 s - 10 s */
20000000, 30000000, 60000000, 100000000, /* 10 s - 100 s */
UINT64_MAX,
};
#define NUM_IO_WAIT_BUCKETS (lengthof(io_wait_bucket_thresholds))
typedef struct IOHistogramData
{
uint64 wait_us_count;
uint64 wait_us_sum;
uint64 wait_us_bucket[NUM_IO_WAIT_BUCKETS];
} IOHistogramData;
typedef IOHistogramData *IOHistogram;
#define NUM_GETPAGE_WAIT_BUCKETS (lengthof(getpage_wait_bucket_thresholds))
typedef struct
{
@@ -48,7 +39,9 @@ typedef struct
* the backend, but the 'neon_backend_perf_counters' view will convert
* them to seconds, to make them more idiomatic as prometheus metrics.
*/
IOHistogramData getpage_hist;
uint64 getpage_wait_us_count;
uint64 getpage_wait_us_sum;
uint64 getpage_wait_us_bucket[NUM_GETPAGE_WAIT_BUCKETS];
/*
* Total number of speculative prefetch Getpage requests and synchronous
@@ -57,11 +50,7 @@ typedef struct
uint64 getpage_prefetch_requests_total;
uint64 getpage_sync_requests_total;
/*
* Total number of readahead misses; consisting of either prefetches that
* don't satisfy the LSN bounds, or cases where no readahead was issued
* for the read.
*/
/* XXX: It's not clear to me when these misses happen. */
uint64 getpage_prefetch_misses_total;
/*
@@ -91,16 +80,6 @@ typedef struct
* this can be smaller than pageserver_requests_sent_total.
*/
uint64 pageserver_send_flushes_total;
/*
* Number of open requests to PageServer.
*/
uint64 pageserver_open_requests;
/*
* Number of unused prefetches currently cached in this backend.
*/
uint64 getpage_prefetches_buffered;
/*
* Number of requests satisfied from the LFC.
@@ -112,9 +91,6 @@ typedef struct
*/
uint64 file_cache_hits_total;
/* LFC I/O time buckets */
IOHistogramData file_cache_read_hist;
IOHistogramData file_cache_write_hist;
} neon_per_backend_counters;
/* Pointer to the shared memory array of neon_per_backend_counters structs */
@@ -135,8 +111,6 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
#endif
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);
extern void inc_page_cache_write_wait(uint64 latency);
extern Size NeonPerfCountersShmemSize(void);
extern void NeonPerfCountersShmemInit(void);

View File

@@ -488,11 +488,6 @@ readahead_buffer_resize(int newsize, void *extra)
newPState->n_unused -= 1;
}
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
MyNeonCounters->pageserver_open_requests =
MyPState->n_requests_inflight;
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
{
prefetch_set_unused(end);
@@ -626,8 +621,6 @@ prefetch_read(PrefetchRequest *slot)
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
/* update slot state */
slot->status = PRFS_RECEIVED;
@@ -681,15 +674,6 @@ prefetch_on_ps_disconnect(void)
prefetch_set_unused(ring_index);
}
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
*/
MyNeonCounters->pageserver_open_requests =
MyPState->n_requests_inflight;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
}
/*
@@ -722,9 +706,6 @@ prefetch_set_unused(uint64 ring_index)
MyPState->n_responses_buffered -= 1;
MyPState->n_unused += 1;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
}
else
{
@@ -839,15 +820,6 @@ prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
hashkey.buftag = tag;
Retry:
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
*/
MyNeonCounters->pageserver_open_requests =
MyPState->ring_unused - MyPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
min_ring_index = UINT64_MAX;
for (int i = 0; i < nblocks; i++)
{
@@ -1029,9 +1001,6 @@ Retry:
prefetch_do_request(slot, lsns);
}
MyNeonCounters->pageserver_open_requests =
MyPState->ring_unused - MyPState->ring_receive;
Assert(any_hits);
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
@@ -1107,10 +1076,8 @@ page_server_request(void const *req)
{
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
consume_prefetch_responses();
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
}
PG_CATCH();
{
@@ -1119,8 +1086,6 @@ page_server_request(void const *req)
* point, but this currently seems fine for now.
*/
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
PG_RE_THROW();
}
PG_END_TRY();

View File

@@ -77,7 +77,7 @@ subtle.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres = { workspace = true, features = ["with-serde_json-1"] }
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
@@ -101,7 +101,7 @@ jose-jwa = "0.1.2"
jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
signature = "2"
ecdsa = "0.16"
p256 = { version = "0.13", features = ["jwk"] }
p256 = "0.13"
rsa = "0.9"
workspace_hack.workspace = true

View File

@@ -1,24 +1,18 @@
use crate::{
auth,
cache::Cached,
compute,
auth, compute,
config::AuthenticationConfig,
context::RequestMonitoring,
control_plane::{self, provider::NodeInfo, CachedNodeInfo},
control_plane::{self, provider::NodeInfo},
error::{ReportableError, UserFacingError},
proxy::connect_compute::ComputeConnectBackend,
stream::PqStream,
waiters,
};
use async_trait::async_trait;
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
use super::ComputeCredentialKeys;
#[derive(Debug, Error)]
pub(crate) enum WebAuthError {
#[error(transparent)]
@@ -31,11 +25,6 @@ pub(crate) enum WebAuthError {
Io(#[from] std::io::Error),
}
#[derive(Debug)]
pub struct ConsoleRedirectBackend {
console_uri: reqwest::Url,
}
impl UserFacingError for WebAuthError {
fn to_string_client(&self) -> String {
"Internal error".to_string()
@@ -68,40 +57,7 @@ pub(crate) fn new_psql_session_id() -> String {
hex::encode(rand::random::<[u8; 8]>())
}
impl ConsoleRedirectBackend {
pub fn new(console_uri: reqwest::Url) -> Self {
Self { console_uri }
}
pub(crate) async fn authenticate(
&self,
ctx: &RequestMonitoring,
auth_config: &'static AuthenticationConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<ConsoleRedirectNodeInfo> {
authenticate(ctx, auth_config, &self.console_uri, client)
.await
.map(ConsoleRedirectNodeInfo)
}
}
pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
#[async_trait]
impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
async fn wake_compute(
&self,
_ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
Ok(Cached::new_uncached(self.0.clone()))
}
fn get_keys(&self) -> &ComputeCredentialKeys {
&ComputeCredentialKeys::None
}
}
async fn authenticate(
pub(super) async fn authenticate(
ctx: &RequestMonitoring,
auth_config: &'static AuthenticationConfig,
link_uri: &reqwest::Url,

View File

@@ -17,8 +17,6 @@ use crate::{
RoleName,
};
use super::ComputeCredentialKeys;
// TODO(conrad): make these configurable.
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
const MIN_RENEW: Duration = Duration::from_secs(30);
@@ -243,7 +241,7 @@ impl JwkCacheEntryLock {
endpoint: EndpointId,
role_name: &RoleName,
fetch: &F,
) -> Result<ComputeCredentialKeys, anyhow::Error> {
) -> Result<(), anyhow::Error> {
// JWT compact form is defined to be
// <B64(Header)> || . || <B64(Payload)> || . || <B64(Signature)>
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
@@ -302,9 +300,9 @@ impl JwkCacheEntryLock {
key => bail!("unsupported key type {key:?}"),
};
let payloadb = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
let payload = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payloadb)
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payload)
.context("Provided authentication token is not a valid JWT encoding")?;
tracing::debug!(?payload, "JWT signature valid with claims");
@@ -329,7 +327,7 @@ impl JwkCacheEntryLock {
);
}
Ok(ComputeCredentialKeys::JwtPayload(payloadb))
Ok(())
}
}
@@ -341,7 +339,7 @@ impl JwkCache {
role_name: &RoleName,
fetch: &F,
jwt: &str,
) -> Result<ComputeCredentialKeys, anyhow::Error> {
) -> Result<(), anyhow::Error> {
// try with just a read lock first
let key = (endpoint.clone(), role_name.clone());
let entry = self.map.get(&key).as_deref().map(Arc::clone);

View File

@@ -8,7 +8,6 @@ use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
pub use console_redirect::ConsoleRedirectBackend;
pub(crate) use console_redirect::WebAuthError;
use ipnet::{Ipv4Net, Ipv6Net};
use local::LocalBackend;
@@ -22,7 +21,7 @@ use crate::cache::Cached;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::provider::{CachedRoleSecret, ControlPlaneBackend};
use crate::control_plane::AuthSecret;
use crate::control_plane::{AuthSecret, NodeInfo};
use crate::intern::EndpointIdInt;
use crate::metrics::Metrics;
use crate::proxy::connect_compute::ComputeConnectBackend;
@@ -37,7 +36,7 @@ use crate::{
provider::{CachedAllowedIps, CachedNodeInfo},
Api,
},
stream,
stream, url,
};
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
@@ -66,9 +65,11 @@ impl<T> std::ops::Deref for MaybeOwned<'_, T> {
/// * However, when we substitute `T` with [`ComputeUserInfoMaybeEndpoint`],
/// this helps us provide the credentials only to those auth
/// backends which require them for the authentication process.
pub enum Backend<'a, T> {
pub enum Backend<'a, T, D> {
/// Cloud API (V2).
ControlPlane(MaybeOwned<'a, ControlPlaneBackend>, T),
/// Authentication via a web browser.
ConsoleRedirect(MaybeOwned<'a, url::ApiUrl>, D),
/// Local proxy uses configured auth credentials and does not wake compute
Local(MaybeOwned<'a, LocalBackend>),
}
@@ -89,7 +90,7 @@ impl Clone for Box<dyn TestBackend> {
}
}
impl std::fmt::Display for Backend<'_, ()> {
impl std::fmt::Display for Backend<'_, (), ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ControlPlane(api, ()) => match &**api {
@@ -105,39 +106,46 @@ impl std::fmt::Display for Backend<'_, ()> {
#[cfg(test)]
ControlPlaneBackend::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
},
Self::ConsoleRedirect(url, ()) => fmt
.debug_tuple("ConsoleRedirect")
.field(&url.as_str())
.finish(),
Self::Local(_) => fmt.debug_tuple("Local").finish(),
}
}
}
impl<T> Backend<'_, T> {
impl<T, D> Backend<'_, T, D> {
/// Very similar to [`std::option::Option::as_ref`].
/// This helps us pass structured config to async tasks.
pub(crate) fn as_ref(&self) -> Backend<'_, &T> {
pub(crate) fn as_ref(&self) -> Backend<'_, &T, &D> {
match self {
Self::ControlPlane(c, x) => Backend::ControlPlane(MaybeOwned::Borrowed(c), x),
Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(MaybeOwned::Borrowed(c), x),
Self::Local(l) => Backend::Local(MaybeOwned::Borrowed(l)),
}
}
}
impl<'a, T> Backend<'a, T> {
impl<'a, T, D> Backend<'a, T, D> {
/// Very similar to [`std::option::Option::map`].
/// Maps [`Backend<T>`] to [`Backend<R>`] by applying
/// a function to a contained value.
pub(crate) fn map<R>(self, f: impl FnOnce(T) -> R) -> Backend<'a, R> {
pub(crate) fn map<R>(self, f: impl FnOnce(T) -> R) -> Backend<'a, R, D> {
match self {
Self::ControlPlane(c, x) => Backend::ControlPlane(c, f(x)),
Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(c, x),
Self::Local(l) => Backend::Local(l),
}
}
}
impl<'a, T, E> Backend<'a, Result<T, E>> {
impl<'a, T, D, E> Backend<'a, Result<T, E>, D> {
/// Very similar to [`std::option::Option::transpose`].
/// This is most useful for error handling.
pub(crate) fn transpose(self) -> Result<Backend<'a, T>, E> {
pub(crate) fn transpose(self) -> Result<Backend<'a, T, D>, E> {
match self {
Self::ControlPlane(c, x) => x.map(|x| Backend::ControlPlane(c, x)),
Self::ConsoleRedirect(c, x) => Ok(Backend::ConsoleRedirect(c, x)),
Self::Local(l) => Ok(Backend::Local(l)),
}
}
@@ -167,12 +175,10 @@ impl ComputeUserInfo {
}
}
#[cfg_attr(test, derive(Debug))]
pub(crate) enum ComputeCredentialKeys {
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
JwtPayload(Vec<u8>),
None,
}
@@ -233,6 +239,7 @@ impl AuthenticationConfig {
pub(crate) fn check_rate_limit(
&self,
ctx: &RequestMonitoring,
config: &AuthenticationConfig,
secret: AuthSecret,
endpoint: &EndpointId,
is_cleartext: bool,
@@ -256,7 +263,7 @@ impl AuthenticationConfig {
let limit_not_exceeded = self.rate_limiter.check(
(
endpoint_int,
MaskedIp::new(ctx.peer_addr(), self.rate_limit_ip_subnet),
MaskedIp::new(ctx.peer_addr(), config.rate_limit_ip_subnet),
),
password_weight,
);
@@ -330,6 +337,7 @@ async fn auth_quirks(
let secret = if let Some(secret) = secret {
config.check_rate_limit(
ctx,
config,
secret,
&info.endpoint,
unauthenticated_password.is_some() || allow_cleartext,
@@ -405,11 +413,12 @@ async fn authenticate_with_secret(
classic::authenticate(ctx, info, client, config, secret).await
}
impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> {
/// Get username from the credentials.
pub(crate) fn get_user(&self) -> &str {
match self {
Self::ControlPlane(_, user_info) => &user_info.user,
Self::ConsoleRedirect(_, ()) => "web",
Self::Local(_) => "local",
}
}
@@ -423,7 +432,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
allow_cleartext: bool,
config: &'static AuthenticationConfig,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> auth::Result<Backend<'a, ComputeCredentials>> {
) -> auth::Result<Backend<'a, ComputeCredentials, NodeInfo>> {
let res = match self {
Self::ControlPlane(api, user_info) => {
info!(
@@ -444,6 +453,14 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
.await?;
Backend::ControlPlane(api, credentials)
}
// NOTE: this auth backend doesn't use client credentials.
Self::ConsoleRedirect(url, ()) => {
info!("performing web authentication");
let info = console_redirect::authenticate(ctx, config, &url, client).await?;
Backend::ConsoleRedirect(url, info)
}
Self::Local(_) => {
return Err(auth::AuthError::bad_auth_method("invalid for local proxy"))
}
@@ -454,13 +471,14 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
}
}
impl Backend<'_, ComputeUserInfo> {
impl Backend<'_, ComputeUserInfo, &()> {
pub(crate) async fn get_role_secret(
&self,
ctx: &RequestMonitoring,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
match self {
Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await,
Self::ConsoleRedirect(_, ()) => Ok(Cached::new_uncached(None)),
Self::Local(_) => Ok(Cached::new_uncached(None)),
}
}
@@ -473,19 +491,21 @@ impl Backend<'_, ComputeUserInfo> {
Self::ControlPlane(api, user_info) => {
api.get_allowed_ips_and_secret(ctx, user_info).await
}
Self::ConsoleRedirect(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
}
}
}
#[async_trait::async_trait]
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
impl ComputeConnectBackend for Backend<'_, ComputeCredentials, NodeInfo> {
async fn wake_compute(
&self,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
match self {
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::ConsoleRedirect(_, info) => Ok(Cached::new_uncached(info.clone())),
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
}
}
@@ -493,6 +513,31 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
fn get_keys(&self) -> &ComputeCredentialKeys {
match self {
Self::ControlPlane(_, creds) => &creds.keys,
Self::ConsoleRedirect(_, _) => &ComputeCredentialKeys::None,
Self::Local(_) => &ComputeCredentialKeys::None,
}
}
}
#[async_trait::async_trait]
impl ComputeConnectBackend for Backend<'_, ComputeCredentials, &()> {
async fn wake_compute(
&self,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
match self {
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::ConsoleRedirect(_, ()) => {
unreachable!("web auth flow doesn't support waking the compute")
}
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
}
}
fn get_keys(&self) -> &ComputeCredentialKeys {
match self {
Self::ControlPlane(_, creds) => &creds.keys,
Self::ConsoleRedirect(_, ()) => &ComputeCredentialKeys::None,
Self::Local(_) => &ComputeCredentialKeys::None,
}
}

View File

@@ -6,12 +6,9 @@ use compute_api::spec::LocalProxySpec;
use dashmap::DashMap;
use futures::future::Either;
use proxy::{
auth::{
self,
backend::{
jwt::JwkCache,
local::{LocalBackend, JWKS_ROLE_MAP},
},
auth::backend::{
jwt::JwkCache,
local::{LocalBackend, JWKS_ROLE_MAP},
},
cancellation::CancellationHandlerMain,
config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig},
@@ -135,7 +132,6 @@ async fn main() -> anyhow::Result<()> {
let args = LocalProxyCliArgs::parse();
let config = build_config(&args)?;
let auth_backend = build_auth_backend(&args)?;
// before we bind to any ports, write the process ID to a file
// so that compute-ctl can find our process later
@@ -197,7 +193,6 @@ async fn main() -> anyhow::Result<()> {
let task = serverless::task_main(
config,
auth_backend,
http_listener,
shutdown.clone(),
Arc::new(CancellationHandlerMain::new(
@@ -262,6 +257,9 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: None,
auth_backend: proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned(
LocalBackend::new(args.compute),
)),
metric_collection: None,
allow_self_signed_compute: false,
http_config,
@@ -288,17 +286,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
})))
}
/// auth::Backend is created at proxy startup, and lives forever.
fn build_auth_backend(
args: &LocalProxyCliArgs,
) -> anyhow::Result<&'static auth::Backend<'static, ()>> {
let auth_backend = proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned(
LocalBackend::new(args.compute),
));
Ok(Box::leak(Box::new(auth_backend)))
}
async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
loop {
rx.notified().await;

View File

@@ -10,7 +10,6 @@ use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::AuthRateLimiter;
use proxy::auth::backend::ConsoleRedirectBackend;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
@@ -312,12 +311,8 @@ async fn main() -> anyhow::Result<()> {
let args = ProxyCliArgs::parse();
let config = build_config(&args)?;
let auth_backend = build_auth_backend(&args)?;
match auth_backend {
Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"),
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
};
info!("Authentication backend: {}", config.auth_backend);
info!("Using region: {}", args.aws_region);
let region_provider =
@@ -464,41 +459,24 @@ async fn main() -> anyhow::Result<()> {
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
match auth_backend {
Either::Left(auth_backend) => {
if let Some(proxy_listener) = proxy_listener {
client_tasks.spawn(proxy::proxy::task_main(
config,
auth_backend,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
));
}
if let Some(proxy_listener) = proxy_listener {
client_tasks.spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
));
}
if let Some(serverless_listener) = serverless_listener {
client_tasks.spawn(serverless::task_main(
config,
auth_backend,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
));
}
}
Either::Right(auth_backend) => {
if let Some(proxy_listener) = proxy_listener {
client_tasks.spawn(proxy::console_redirect_proxy::task_main(
config,
auth_backend,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
));
}
}
if let Some(serverless_listener) = serverless_listener {
client_tasks.spawn(serverless::task_main(
config,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
));
}
client_tasks.spawn(proxy::context::parquet::worker(
@@ -528,7 +506,7 @@ async fn main() -> anyhow::Result<()> {
));
}
if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend {
if let auth::Backend::ControlPlane(api, _) = &config.auth_backend {
if let proxy::control_plane::provider::ControlPlaneBackend::Management(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
@@ -632,83 +610,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
bail!("dynamic rate limiter should be disabled");
}
let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.connect_compute_lock.parse()?;
info!(
?limiter,
shards,
?epoch,
"Using NodeLocks (connect_compute)"
);
let connect_compute_locks = control_plane::locks::ApiLocks::new(
"connect_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().proxy.connect_compute_lock,
)?;
let http_config = HttpConfig {
accept_websockets: !args.is_auth_broker,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes,
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
};
let authentication_config = AuthenticationConfig {
jwks_cache: JwkCache::default(),
thread_pool,
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
is_auth_broker: args.is_auth_broker,
accept_jwts: args.is_auth_broker,
webauth_confirmation_timeout: args.webauth_confirmation_timeout,
};
let config = ProxyConfig {
tls_config,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute_retry_config: config::RetryConfig::parse(
&args.connect_to_compute_retry,
)?,
};
let config = Box::leak(Box::new(config));
tokio::spawn(config.connect_compute_locks.garbage_collect_worker());
Ok(config)
}
/// auth::Backend is created at proxy startup, and lives forever.
fn build_auth_backend(
args: &ProxyCliArgs,
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
match &args.auth_backend {
let auth_backend = match &args.auth_backend {
AuthBackendType::Console => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
@@ -758,11 +660,12 @@ fn build_auth_backend(
wake_compute_endpoint_rate_limiter,
);
let api = control_plane::provider::ControlPlaneBackend::Management(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
auth::Backend::ControlPlane(MaybeOwned::Owned(api), ())
}
let config = Box::leak(Box::new(auth_backend));
Ok(Either::Left(config))
AuthBackendType::Web => {
let url = args.uri.parse()?;
auth::Backend::ConsoleRedirect(MaybeOwned::Owned(url), ())
}
#[cfg(feature = "testing")]
@@ -770,23 +673,79 @@ fn build_auth_backend(
let url = args.auth_endpoint.parse()?;
let api = control_plane::provider::mock::Api::new(url, !args.is_private_access_proxy);
let api = control_plane::provider::ControlPlaneBackend::PostgresMock(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
let config = Box::leak(Box::new(auth_backend));
Ok(Either::Left(config))
auth::Backend::ControlPlane(MaybeOwned::Owned(api), ())
}
};
AuthBackendType::Web => {
let url = args.uri.parse()?;
let backend = ConsoleRedirectBackend::new(url);
let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.connect_compute_lock.parse()?;
info!(
?limiter,
shards,
?epoch,
"Using NodeLocks (connect_compute)"
);
let connect_compute_locks = control_plane::locks::ApiLocks::new(
"connect_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().proxy.connect_compute_lock,
)?;
let config = Box::leak(Box::new(backend));
let http_config = HttpConfig {
accept_websockets: !args.is_auth_broker,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes,
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
};
let authentication_config = AuthenticationConfig {
jwks_cache: JwkCache::default(),
thread_pool,
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
is_auth_broker: args.is_auth_broker,
accept_jwts: args.is_auth_broker,
webauth_confirmation_timeout: args.webauth_confirmation_timeout,
};
Ok(Either::Right(config))
}
}
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute_retry_config: config::RetryConfig::parse(
&args.connect_to_compute_retry,
)?,
}));
tokio::spawn(config.connect_compute_locks.garbage_collect_worker());
Ok(config)
}
#[cfg(test)]

View File

@@ -1,5 +1,8 @@
use crate::{
auth::backend::{jwt::JwkCache, AuthRateLimiter},
auth::{
self,
backend::{jwt::JwkCache, AuthRateLimiter},
},
control_plane::locks::ApiLocks,
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
scram::threadpool::ThreadPool,
@@ -26,6 +29,7 @@ use x509_parser::oid_registry;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::Backend<'static, (), ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,

View File

@@ -1,217 +0,0 @@
use crate::auth::backend::ConsoleRedirectBackend;
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::proxy::{
prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource,
};
use crate::{
cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal},
context::RequestMonitoring,
error::ReportableError,
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::read_proxy_protocol,
proxy::handshake::{handshake, HandshakeData},
};
use futures::TryFutureExt;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, Instrument};
use crate::proxy::{
connect_compute::{connect_to_compute, TcpMechanism},
passthrough::ProxyPassthrough,
};
pub async fn task_main(
config: &'static ProxyConfig,
backend: &'static ConsoleRedirectBackend,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
let conn_gauge = Metrics::get()
.proxy
.client_connections
.guard(crate::metrics::Protocol::Tcp);
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
connections.spawn(async move {
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
Err(e) => {
error!("per-client task finished with an error: {e:#}");
return;
}
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
error!("missing required proxy protocol header");
return;
}
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
error!("proxy protocol header not supported");
return;
}
Ok((socket, Some(addr))) => (socket, addr.ip()),
Ok((socket, None)) => (socket, peer_addr.ip()),
};
match socket.inner.set_nodelay(true) {
Ok(()) => {}
Err(e) => {
error!("per-client task finished with an error: failed to set socket option: {e:#}");
return;
}
};
let ctx = RequestMonitoring::new(
session_id,
peer_addr,
crate::metrics::Protocol::Tcp,
&config.region,
);
let span = ctx.span();
let startup = Box::pin(
handle_client(
config,
backend,
&ctx,
cancellation_handler,
socket,
conn_gauge,
)
.instrument(span.clone()),
);
let res = startup.await;
match res {
Err(e) => {
// todo: log and push to ctx the error kind
ctx.set_error_kind(e.get_error_kind());
error!(parent: &span, "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
}
Ok(Some(p)) => {
ctx.set_success();
ctx.log_connect();
match p.proxy_pass().instrument(span.clone()).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
}
Err(ErrorSource::Compute(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
}
}
}
}
});
}
connections.close();
drop(listener);
// Drain connections
connections.wait().await;
Ok(())
}
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig,
backend: &'static ConsoleRedirectBackend,
ctx: &RequestMonitoring,
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
protocol = %ctx.protocol(),
"handling interactive connection from client"
);
let metrics = &Metrics::get().proxy;
let proto = ctx.protocol();
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return Ok(cancellation_handler
.cancel_session(cancel_key_data, ctx.session_id())
.await
.map(|()| None)?)
}
};
drop(pause);
ctx.set_db_options(params.clone());
let user_info = match backend
.authenticate(ctx, &config.authentication_config, &mut stream)
.await
{
Ok(auth_result) => auth_result,
Err(e) => {
return stream.throw_error(e).await?;
}
};
let mut node = connect_to_compute(
ctx,
&TcpMechanism {
params: &params,
locks: &config.connect_compute_locks,
},
&user_info,
config.allow_self_signed_compute,
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
)
.or_else(|e| stream.throw_error(e))
.await?;
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm
// driver in pipeline mode sends startup, password and first query
// immediately after opening the connection.
let (stream, read_buf) = stream.into_inner();
node.stream.write_all(&read_buf).await?;
Ok(Some(ProxyPassthrough {
client: stream,
aux: node.aux.clone(),
compute: node,
_req: request_gauge,
_conn: conn_gauge,
_cancel: session,
}))
}

View File

@@ -81,12 +81,12 @@ pub(crate) mod errors {
Reason::EndpointNotFound => ErrorKind::User,
Reason::BranchNotFound => ErrorKind::User,
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::User,
Reason::ActiveTimeQuotaExceeded => ErrorKind::User,
Reason::ComputeTimeQuotaExceeded => ErrorKind::User,
Reason::WrittenDataQuotaExceeded => ErrorKind::User,
Reason::DataTransferQuotaExceeded => ErrorKind::User,
Reason::LogicalSizeQuotaExceeded => ErrorKind::User,
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
@@ -103,7 +103,7 @@ pub(crate) mod errors {
} if error
.contains("compute time quota of non-primary branches is exceeded") =>
{
crate::error::ErrorKind::Quota
crate::error::ErrorKind::User
}
ControlPlaneError {
http_status_code: http::StatusCode::LOCKED,
@@ -112,7 +112,7 @@ pub(crate) mod errors {
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::Quota
crate::error::ErrorKind::User
}
ControlPlaneError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
@@ -309,7 +309,7 @@ impl NodeInfo {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => self.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
ComputeCredentialKeys::None => &mut self.config,
};
}
}

View File

@@ -22,7 +22,7 @@ use futures::TryFutureExt;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{debug, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument};
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
@@ -456,7 +456,7 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
});
body.http_status_code = status;
warn!("console responded with an error ({status}): {body:?}");
error!("console responded with an error ({status}): {body:?}");
Err(ApiError::ControlPlane(body))
}

View File

@@ -49,10 +49,6 @@ pub enum ErrorKind {
#[label(rename = "serviceratelimit")]
ServiceRateLimit,
/// Proxy quota limit violation
#[label(rename = "quota")]
Quota,
/// internal errors
Service,
@@ -74,7 +70,6 @@ impl ErrorKind {
ErrorKind::ClientDisconnect => "clientdisconnect",
ErrorKind::RateLimit => "ratelimit",
ErrorKind::ServiceRateLimit => "serviceratelimit",
ErrorKind::Quota => "quota",
ErrorKind::Service => "service",
ErrorKind::ControlPlane => "controlplane",
ErrorKind::Postgres => "postgres",

View File

@@ -95,7 +95,6 @@ pub mod cache;
pub mod cancellation;
pub mod compute;
pub mod config;
pub mod console_redirect_proxy;
pub mod context;
pub mod control_plane;
pub mod error;

View File

@@ -35,7 +35,7 @@ use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn, Instrument};
use tracing::{error, info, Instrument};
use self::{
connect_compute::{connect_to_compute, TcpMechanism},
@@ -61,7 +61,6 @@ pub async fn run_until_cancelled<F: std::future::Future>(
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
@@ -96,15 +95,15 @@ pub async fn task_main(
connections.spawn(async move {
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
Err(e) => {
warn!("per-client task finished with an error: {e:#}");
error!("per-client task finished with an error: {e:#}");
return;
}
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
warn!("missing required proxy protocol header");
error!("missing required proxy protocol header");
return;
}
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
warn!("proxy protocol header not supported");
error!("proxy protocol header not supported");
return;
}
Ok((socket, Some(addr))) => (socket, addr.ip()),
@@ -130,7 +129,6 @@ pub async fn task_main(
let startup = Box::pin(
handle_client(
config,
auth_backend,
&ctx,
cancellation_handler,
socket,
@@ -146,7 +144,7 @@ pub async fn task_main(
Err(e) => {
// todo: log and push to ctx the error kind
ctx.set_error_kind(e.get_error_kind());
warn!(parent: &span, "per-client task finished with an error: {e:#}");
error!(parent: &span, "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
@@ -157,7 +155,7 @@ pub async fn task_main(
match p.proxy_pass().instrument(span.clone()).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
}
Err(ErrorSource::Compute(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
@@ -245,10 +243,8 @@ impl ReportableError for ClientRequestError {
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
ctx: &RequestMonitoring,
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
@@ -289,7 +285,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let common_names = tls.map(|tls| &tls.common_names);
// Extract credentials which we're going to use for auth.
let result = auth_backend
let result = config
.auth_backend
.as_ref()
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, &params, hostname, common_names))
.transpose();
@@ -356,7 +353,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
/// Finish client connection initialization: confirm auth success, send params, etc.
#[tracing::instrument(skip_all)]
pub(crate) async fn prepare_client_connection<P>(
async fn prepare_client_connection<P>(
node: &compute::PostgresConnection,
session: &cancellation::Session<P>,
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,

View File

@@ -71,7 +71,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
tracing::warn!(?err, "could not cancel the query in the database");
tracing::error!(?err, "could not cancel the query in the database");
}
res
}

View File

@@ -552,7 +552,7 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> auth::Backend<'static, ComputeCredentials> {
) -> auth::Backend<'static, ComputeCredentials, &()> {
let user_info = auth::Backend::ControlPlane(
MaybeOwned::Owned(ControlPlaneBackend::Test(Box::new(mechanism.clone()))),
ComputeCredentials {

View File

@@ -6,7 +6,7 @@ use redis::{
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
use super::elasticache::CredentialsProvider;
@@ -89,7 +89,7 @@ impl ConnectionWithCredentialsProvider {
return Ok(());
}
Err(e) => {
warn!("Error during PING: {e:?}");
error!("Error during PING: {e:?}");
}
}
} else {
@@ -121,7 +121,7 @@ impl ConnectionWithCredentialsProvider {
info!("Connection succesfully established");
}
Err(e) => {
warn!("Connection is broken. Error during PING: {e:?}");
error!("Connection is broken. Error during PING: {e:?}");
}
}
self.con = Some(con);

View File

@@ -146,7 +146,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
{
Ok(()) => {}
Err(e) => {
tracing::warn!("failed to cancel session: {e}");
tracing::error!("failed to cancel session: {e}");
}
}
}

View File

@@ -3,17 +3,15 @@ use std::{io, sync::Arc, time::Duration};
use async_trait::async_trait;
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use tokio::net::{lookup_host, TcpStream};
use tokio_postgres::types::ToSql;
use tracing::{debug, field::display, info};
use tracing::{field::display, info};
use crate::{
auth::{
self,
backend::{local::StaticAuthRules, ComputeCredentials, ComputeUserInfo},
check_peer_addr_is_in_list, AuthError,
},
compute,
config::ProxyConfig,
config::{AuthenticationConfig, ProxyConfig},
context::RequestMonitoring,
control_plane::{
errors::{GetAuthInfoError, WakeComputeError},
@@ -28,21 +26,18 @@ use crate::{
retry::{CouldRetry, ShouldRetryWakeCompute},
},
rate_limiter::EndpointRateLimiter,
EndpointId, Host,
Host,
};
use super::{
conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool},
http_conn_pool::{self, poll_http2_client},
local_conn_pool::{self, LocalClient, LocalConnPool},
};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
}
@@ -50,13 +45,18 @@ impl PoolingBackend {
pub(crate) async fn authenticate_with_password(
&self,
ctx: &RequestMonitoring,
config: &AuthenticationConfig,
user_info: &ComputeUserInfo,
password: &[u8],
) -> Result<ComputeCredentials, AuthError> {
let user_info = user_info.clone();
let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
let backend = self
.config
.auth_backend
.as_ref()
.map(|()| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if self.config.authentication_config.ip_allowlist_check_enabled
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
@@ -75,6 +75,7 @@ impl PoolingBackend {
let secret = match cached_secret.value.clone() {
Some(secret) => self.config.authentication_config.check_rate_limit(
ctx,
config,
secret,
&user_info.endpoint,
true,
@@ -86,13 +87,9 @@ impl PoolingBackend {
}
};
let ep = EndpointIdInt::from(&user_info.endpoint);
let auth_outcome = crate::auth::validate_password_and_exchange(
&self.config.authentication_config.thread_pool,
ep,
password,
secret,
)
.await?;
let auth_outcome =
crate::auth::validate_password_and_exchange(&config.thread_pool, ep, password, secret)
.await?;
let res = match auth_outcome {
crate::sasl::Outcome::Success(key) => {
info!("user successfully authenticated");
@@ -112,13 +109,13 @@ impl PoolingBackend {
pub(crate) async fn authenticate_with_jwt(
&self,
ctx: &RequestMonitoring,
config: &AuthenticationConfig,
user_info: &ComputeUserInfo,
jwt: String,
) -> Result<ComputeCredentials, AuthError> {
match &self.auth_backend {
) -> Result<(), AuthError> {
match &self.config.auth_backend {
crate::auth::Backend::ControlPlane(console, ()) => {
self.config
.authentication_config
config
.jwks_cache
.check_jwt(
ctx,
@@ -130,15 +127,13 @@ impl PoolingBackend {
.await
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
Ok(ComputeCredentials {
info: user_info.clone(),
keys: crate::auth::backend::ComputeCredentialKeys::None,
})
Ok(())
}
crate::auth::Backend::ConsoleRedirect(_, ()) => Err(AuthError::auth_failed(
"JWT login over web auth proxy is not supported",
)),
crate::auth::Backend::Local(_) => {
let keys = self
.config
.authentication_config
config
.jwks_cache
.check_jwt(
ctx,
@@ -150,10 +145,8 @@ impl PoolingBackend {
.await
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
Ok(ComputeCredentials {
info: user_info.clone(),
keys,
})
// todo: rewrite JWT signature with key shared somehow between local proxy and postgres
Ok(())
}
}
}
@@ -183,7 +176,7 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| keys);
let backend = self.config.auth_backend.as_ref().map(|()| keys);
crate::proxy::connect_compute::connect_to_compute(
ctx,
&TokioMechanism {
@@ -215,14 +208,14 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
info: ComputeUserInfo {
user: conn_info.user_info.user.clone(),
endpoint: EndpointId::from(format!("{}-local-proxy", conn_info.user_info.endpoint)),
options: conn_info.user_info.options.clone(),
},
keys: crate::auth::backend::ComputeCredentialKeys::None,
});
let backend = self
.config
.auth_backend
.as_ref()
.map(|()| ComputeCredentials {
info: conn_info.user_info.clone(),
keys: crate::auth::backend::ComputeCredentialKeys::None,
});
crate::proxy::connect_compute::connect_to_compute(
ctx,
&HyperMechanism {
@@ -238,77 +231,6 @@ impl PoolingBackend {
)
.await
}
/// Connect to postgres over localhost.
///
/// We expect postgres to be started here, so we won't do any retries.
///
/// # Panics
///
/// Panics if called with a non-local_proxy backend.
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub(crate) async fn connect_to_local_postgres(
&self,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
) -> Result<LocalClient<tokio_postgres::Client>, HttpConnError> {
if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
let mut node_info = match &self.auth_backend {
auth::Backend::ControlPlane(_, ()) => {
unreachable!("only local_proxy can connect to local postgres")
}
auth::Backend::Local(local) => local.node_info.clone(),
};
let config = node_info
.config
.user(&conn_info.user_info.user)
.dbname(&conn_info.dbname);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
drop(pause);
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
let handle = local_conn_pool::poll_client(
self.local_pool.clone(),
ctx,
conn_info,
client,
connection,
conn_id,
node_info.aux.clone(),
);
let kid = handle.get_client().get_process_id() as i64;
let jwk = p256::PublicKey::from(handle.key().verifying_key()).to_jwk();
debug!(kid, ?jwk, "setting up backend session state");
// initiates the auth session
handle
.get_client()
.query(
"select auth.init($1, $2);",
&[
&kid as &(dyn ToSql + Sync),
&tokio_postgres::types::Json(jwk),
],
)
.await?;
info!(?kid, "backend session state init");
Ok(handle)
}
}
#[derive(Debug, thiserror::Error)]
@@ -319,8 +241,6 @@ pub(crate) enum HttpConnError {
PostgresConnectionError(#[from] tokio_postgres::Error),
#[error("could not connection to local-proxy in compute")]
LocalProxyConnectionError(#[from] LocalProxyConnError),
#[error("could not parse JWT payload")]
JwtPayloadError(serde_json::Error),
#[error("could not get auth info")]
GetAuthInfo(#[from] GetAuthInfoError),
@@ -346,7 +266,6 @@ impl ReportableError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
HttpConnError::JwtPayloadError(_) => ErrorKind::User,
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
HttpConnError::AuthError(a) => a.get_error_kind(),
HttpConnError::WakeCompute(w) => w.get_error_kind(),
@@ -361,7 +280,6 @@ impl UserFacingError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
HttpConnError::PostgresConnectionError(p) => p.to_string(),
HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
HttpConnError::JwtPayloadError(p) => p.to_string(),
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
HttpConnError::AuthError(c) => c.to_string_client(),
HttpConnError::WakeCompute(c) => c.to_string_client(),
@@ -378,7 +296,6 @@ impl CouldRetry for HttpConnError {
HttpConnError::PostgresConnectionError(e) => e.could_retry(),
HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
HttpConnError::ConnectionClosedAbruptly(_) => false,
HttpConnError::JwtPayloadError(_) => false,
HttpConnError::GetAuthInfo(_) => false,
HttpConnError::AuthError(_) => false,
HttpConnError::WakeCompute(_) => false,
@@ -505,12 +422,8 @@ impl ConnectMechanism for HyperMechanism {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let port = *node_info.config.get_ports().first().ok_or_else(|| {
HttpConnError::WakeCompute(WakeComputeError::BadComputeAddress(
"local-proxy port missing on compute address".into(),
))
})?;
let res = connect_http2(&host, port, timeout).await;
// let port = node_info.config.get_ports().first().unwrap_or_else(10432);
let res = connect_http2(&host, 10432, timeout).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;

View File

@@ -41,10 +41,6 @@ pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes,
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::TOO_MANY_REQUESTS,
),
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::REQUEST_TIMEOUT,

View File

@@ -1,544 +0,0 @@
use futures::{future::poll_fn, Future};
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use p256::ecdsa::{Signature, SigningKey};
use parking_lot::RwLock;
use rand::rngs::OsRng;
use serde_json::Value;
use signature::Signer;
use std::task::{ready, Poll};
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::types::ToSql;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use typed_json::json;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{context::RequestMonitoring, DbName, RoleName};
use tracing::{debug, error, warn, Span};
use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
use super::conn_pool::{ClientInnerExt, ConnInfo};
struct ConnPoolEntry<C: ClientInnerExt> {
conn: ClientInner<C>,
_last_access: std::time::Instant,
}
// /// key id for the pg_session_jwt state
// static PG_SESSION_JWT_KID: AtomicU64 = AtomicU64::new(1);
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
max_conns: usize,
global_pool_size_max_conns: usize,
}
impl<C: ClientInnerExt> EndpointConnPool<C> {
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
let Self {
pools, total_conns, ..
} = self;
pools
.get_mut(&db_user)
.and_then(|pool_entries| pool_entries.get_conn_entry(total_conns))
}
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
let Self {
pools, total_conns, ..
} = self;
if let Some(pool) = pools.get_mut(&db_user) {
let old_len = pool.conns.len();
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
let new_len = pool.conns.len();
let removed = old_len - new_len;
if removed > 0 {
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
}
*total_conns -= removed;
removed > 0
} else {
false
}
}
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
let conn_id = client.conn_id;
if client.is_closed() {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because connection is closed");
return;
}
let global_max_conn = pool.read().global_pool_size_max_conns;
if pool.read().total_conns >= global_max_conn {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full");
return;
}
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < pool.max_conns {
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "local_pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
}
}
impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
fn drop(&mut self) {
if self.total_conns > 0 {
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(self.total_conns as i64);
}
}
}
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
conns: Vec<ConnPoolEntry<C>>,
}
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
fn default() -> Self {
Self { conns: Vec::new() }
}
}
impl<C: ClientInnerExt> DbUserConnPool<C> {
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
let old_len = self.conns.len();
self.conns.retain(|conn| !conn.conn.is_closed());
let new_len = self.conns.len();
let removed = old_len - new_len;
*conns -= removed;
removed
}
fn get_conn_entry(&mut self, conns: &mut usize) -> Option<ConnPoolEntry<C>> {
let mut removed = self.clear_closed_clients(conns);
let conn = self.conns.pop();
if conn.is_some() {
*conns -= 1;
removed += 1;
}
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
conn
}
}
pub(crate) struct LocalConnPool<C: ClientInnerExt> {
global_pool: RwLock<EndpointConnPool<C>>,
config: &'static crate::config::HttpConfig,
}
impl<C: ClientInnerExt> LocalConnPool<C> {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
Arc::new(Self {
global_pool: RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: config.pool_options.max_conns_per_endpoint,
global_pool_size_max_conns: config.pool_options.max_total_conns,
}),
config,
})
}
pub(crate) fn get_idle_timeout(&self) -> Duration {
self.config.pool_options.idle_timeout
}
// pub(crate) fn shutdown(&self) {
// let mut pool = self.global_pool.write();
// pool.pools.clear();
// pool.total_conns = 0;
// }
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<LocalClient<C>>, HttpConnError> {
let mut client: Option<ClientInner<C>> = None;
if let Some(entry) = self
.global_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
// ok return cached connection if found and establish a new one otherwise
if let Some(client) = client {
if client.is_closed() {
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
info!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"local_pool: reusing connection '{conn_info}'"
);
client.session.send(ctx.session_id())?;
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(LocalClient::new(
client,
conn_info.clone(),
Arc::downgrade(self),
)));
}
Ok(None)
}
}
pub(crate) fn poll_client(
global_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
client: tokio_postgres::Client,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> LocalClient<tokio_postgres::Client> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
});
let pool = Arc::downgrade(&global_pool);
let pool_clone = pool.clone();
let db_user = conn_info.db_and_user();
let idle = global_pool.get_idle_timeout();
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);
poll_fn(move |cx| {
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
}
match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
}
_ => {}
}
// 5 minute idle connection timeout
if idle_timeout.as_mut().poll(cx).is_ready() {
idle_timeout.as_mut().reset(Instant::now() + idle);
info!("connection idle");
if let Some(pool) = pool.clone().upgrade() {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
info!("idle connection removed");
}
}
}
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session_id, "unknown message");
}
Some(Err(e)) => {
error!(%session_id, "connection error: {}", e);
break
}
None => {
info!("connection closed");
break
}
}
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;
}
.instrument(span));
let key = SigningKey::random(&mut OsRng);
let inner = ClientInner {
inner: client,
session: tx,
cancel,
aux,
conn_id,
key,
jti: 0,
};
LocalClient::new(inner, conn_info, pool_clone)
}
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
// needed for pg_session_jwt state
key: SigningKey,
jti: u64,
}
impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}
impl<C: ClientInnerExt> ClientInner<C> {
pub(crate) fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<C: ClientInnerExt> LocalClient<C> {
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
}
pub(crate) struct LocalClient<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInner<C>>,
conn_info: ConnInfo,
pool: Weak<LocalConnPool<C>>,
}
pub(crate) struct Discard<'a, C: ClientInnerExt> {
conn_info: &'a ConnInfo,
pool: &'a mut Weak<LocalConnPool<C>>,
}
impl<C: ClientInnerExt> LocalClient<C> {
pub(self) fn new(
inner: ClientInner<C>,
conn_info: ConnInfo,
pool: Weak<LocalConnPool<C>>,
) -> Self {
Self {
inner: Some(inner),
span: Span::current(),
conn_info,
pool,
}
}
pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
let Self {
inner,
pool,
conn_info,
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(&mut inner.inner, Discard { conn_info, pool })
}
pub(crate) fn key(&self) -> &SigningKey {
let inner = &self
.inner
.as_ref()
.expect("client inner should not be removed");
&inner.key
}
}
impl LocalClient<tokio_postgres::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
let inner = self
.inner
.as_mut()
.expect("client inner should not be removed");
inner.jti += 1;
let kid = inner.inner.get_process_id();
let header = json!({"kid":kid}).to_string();
let mut payload = serde_json::from_slice::<serde_json::Map<String, Value>>(payload)
.map_err(HttpConnError::JwtPayloadError)?;
payload.insert("jti".to_string(), Value::Number(inner.jti.into()));
let payload = Value::Object(payload).to_string();
debug!(
kid,
jti = inner.jti,
?header,
?payload,
"signing new ephemeral JWT"
);
let token = sign_jwt(&inner.key, header, payload);
// initiates the auth session
inner.inner.simple_query("discard all").await?;
inner
.inner
.query(
"select auth.jwt_session_init($1)",
&[&token as &(dyn ToSql + Sync)],
)
.await?;
info!(kid, jti = inner.jti, "user session state init");
Ok(())
}
}
fn sign_jwt(sk: &SigningKey, header: String, payload: String) -> String {
let header = Base64UrlUnpadded::encode_string(header.as_bytes());
let payload = Base64UrlUnpadded::encode_string(payload.as_bytes());
let message = format!("{header}.{payload}");
let sig: Signature = sk.sign(message.as_bytes());
let base64_sig = Base64UrlUnpadded::encode_string(&sig.to_bytes());
format!("{message}.{base64_sig}")
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(
"local_pool: throwing away connection '{conn_info}' because connection is not idle"
);
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}
impl<C: ClientInnerExt> LocalClient<C> {
pub fn get_client(&self) -> &C {
&self
.inner
.as_ref()
.expect("client inner should not be removed")
.inner
}
fn do_drop(&mut self) -> Option<impl FnOnce()> {
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
let current_span = self.span.clone();
// return connection to the pool
return Some(move || {
let _span = current_span.enter();
EndpointConnPool::put(&conn_pool.global_pool, &conn_info, client);
});
}
None
}
}
impl<C: ClientInnerExt> Drop for LocalClient<C> {
fn drop(&mut self) {
if let Some(drop) = self.do_drop() {
tokio::task::spawn_blocking(drop);
}
}
}

View File

@@ -8,7 +8,6 @@ mod conn_pool;
mod http_conn_pool;
mod http_util;
mod json;
mod local_conn_pool;
mod sql_over_http;
mod websocket;
@@ -48,14 +47,13 @@ use std::pin::{pin, Pin};
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn, Instrument};
use tracing::{error, info, warn, Instrument};
use utils::http::error::ApiError;
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static crate::auth::Backend<'static, ()>,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
@@ -65,7 +63,6 @@ pub async fn task_main(
info!("websocket server has shut down");
}
let local_pool = local_conn_pool::LocalConnPool::new(&config.http_config);
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
@@ -108,10 +105,8 @@ pub async fn task_main(
let backend = Arc::new(PoolingBackend {
http_conn_pool: Arc::clone(&http_conn_pool),
local_pool,
pool: Arc::clone(&conn_pool),
config,
auth_backend,
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
});
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = match config.tls_config.as_ref() {
@@ -243,7 +238,7 @@ async fn connection_startup(
let (conn, peer) = match read_proxy_protocol(conn).await {
Ok(c) => c,
Err(e) => {
tracing::warn!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
tracing::error!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
return None;
}
};
@@ -399,7 +394,6 @@ async fn request_handler(
async move {
if let Err(e) = websocket::serve_websocket(
config,
backend.auth_backend,
ctx,
websocket,
cancellation_handler,
@@ -408,7 +402,7 @@ async fn request_handler(
)
.await
{
warn!("error in websocket connection: {e:#}");
error!("error in websocket connection: {e:#}");
}
}
.instrument(span),

View File

@@ -40,12 +40,11 @@ use url::Url;
use urlencoding;
use utils::http::error::ApiError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::backend::ComputeCredentials;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::endpoint_sni;
use crate::auth::ComputeUserInfoParseError;
use crate::config::AuthenticationConfig;
use crate::config::HttpConfig;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
@@ -57,22 +56,20 @@ use crate::metrics::Metrics;
use crate::proxy::run_until_cancelled;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::MetricCounter;
use crate::usage_metrics::MetricCounterRecorder;
use crate::DbName;
use crate::RoleName;
use super::backend::LocalProxyConnError;
use super::backend::PoolingBackend;
use super::conn_pool;
use super::conn_pool::AuthData;
use super::conn_pool::Client;
use super::conn_pool::ConnInfo;
use super::conn_pool::ConnInfoWithAuth;
use super::http_util::json_response;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::json::JsonConversionError;
use super::local_conn_pool;
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -555,7 +552,7 @@ async fn handle_inner(
match conn_info.auth {
AuthData::Jwt(jwt) if config.authentication_config.is_auth_broker => {
handle_auth_broker_inner(ctx, request, conn_info.conn_info, jwt, backend).await
handle_auth_broker_inner(config, ctx, request, conn_info.conn_info, jwt, backend).await
}
auth => {
handle_db_inner(
@@ -623,35 +620,37 @@ async fn handle_db_inner(
let authenticate_and_connect = Box::pin(
async {
let is_local_proxy = matches!(backend.auth_backend, crate::auth::Backend::Local(_));
let keys = match auth {
AuthData::Password(pw) => {
backend
.authenticate_with_password(ctx, &conn_info.user_info, &pw)
.authenticate_with_password(
ctx,
&config.authentication_config,
&conn_info.user_info,
&pw,
)
.await?
}
AuthData::Jwt(jwt) => {
backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await?
}
};
let client = match keys.keys {
ComputeCredentialKeys::JwtPayload(payload) if is_local_proxy => {
let mut client = backend.connect_to_local_postgres(ctx, conn_info).await?;
client.set_jwt_session(&payload).await?;
Client::Local(client)
}
_ => {
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.authenticate_with_jwt(
ctx,
&config.authentication_config,
&conn_info.user_info,
jwt,
)
.await?;
Client::Remote(client)
ComputeCredentials {
info: conn_info.user_info.clone(),
keys: crate::auth::backend::ComputeCredentialKeys::None,
}
}
};
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.success();
@@ -681,7 +680,7 @@ async fn handle_db_inner(
// Now execute the query and return the result.
let json_output = match payload {
Payload::Single(stmt) => {
stmt.process(&config.http_config, cancel, &mut client, parsed_headers)
stmt.process(config, cancel, &mut client, parsed_headers)
.await?
}
Payload::Batch(statements) => {
@@ -699,7 +698,7 @@ async fn handle_db_inner(
}
statements
.process(&config.http_config, cancel, &mut client, parsed_headers)
.process(config, cancel, &mut client, parsed_headers)
.await?
}
};
@@ -739,6 +738,7 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[
];
async fn handle_auth_broker_inner(
config: &'static ProxyConfig,
ctx: &RequestMonitoring,
request: Request<Incoming>,
conn_info: ConnInfo,
@@ -746,7 +746,12 @@ async fn handle_auth_broker_inner(
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.authenticate_with_jwt(
ctx,
&config.authentication_config,
&conn_info.user_info,
jwt,
)
.await
.map_err(HttpConnError::from)?;
@@ -784,9 +789,9 @@ async fn handle_auth_broker_inner(
impl QueryData {
async fn process(
self,
config: &'static HttpConfig,
config: &'static ProxyConfig,
cancel: CancellationToken,
client: &mut Client,
client: &mut Client<tokio_postgres::Client>,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let (inner, mut discard) = client.inner();
@@ -815,7 +820,7 @@ impl QueryData {
Either::Right((_cancelled, query)) => {
tracing::info!("cancelling query");
if let Err(err) = cancel_token.cancel_query(NoTls).await {
tracing::warn!(?err, "could not cancel query");
tracing::error!(?err, "could not cancel query");
}
// wait for the query cancellation
match time::timeout(time::Duration::from_millis(100), query).await {
@@ -858,9 +863,9 @@ impl QueryData {
impl BatchQueryData {
async fn process(
self,
config: &'static HttpConfig,
config: &'static ProxyConfig,
cancel: CancellationToken,
client: &mut Client,
client: &mut Client<tokio_postgres::Client>,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
info!("starting transaction");
@@ -904,7 +909,7 @@ impl BatchQueryData {
}
Err(SqlOverHttpError::Cancelled(_)) => {
if let Err(err) = cancel_token.cancel_query(NoTls).await {
tracing::warn!(?err, "could not cancel query");
tracing::error!(?err, "could not cancel query");
}
// TODO: after cancelling, wait to see if we can get a status. maybe the connection is still safe.
discard.discard();
@@ -928,7 +933,7 @@ impl BatchQueryData {
}
async fn query_batch(
config: &'static HttpConfig,
config: &'static ProxyConfig,
cancel: CancellationToken,
transaction: &Transaction<'_>,
queries: BatchQueryData,
@@ -967,7 +972,7 @@ async fn query_batch(
}
async fn query_to_json<T: GenericClient>(
config: &'static HttpConfig,
config: &'static ProxyConfig,
client: &T,
data: QueryData,
current_size: &mut usize,
@@ -988,9 +993,9 @@ async fn query_to_json<T: GenericClient>(
rows.push(row);
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > config.max_response_size_bytes {
if *current_size > config.http_config.max_response_size_bytes {
return Err(SqlOverHttpError::ResponseTooLarge(
config.max_response_size_bytes,
config.http_config.max_response_size_bytes,
));
}
}
@@ -1053,50 +1058,3 @@ async fn query_to_json<T: GenericClient>(
Ok((ready, results))
}
enum Client {
Remote(conn_pool::Client<tokio_postgres::Client>),
Local(local_conn_pool::LocalClient<tokio_postgres::Client>),
}
enum Discard<'a> {
Remote(conn_pool::Discard<'a, tokio_postgres::Client>),
Local(local_conn_pool::Discard<'a, tokio_postgres::Client>),
}
impl Client {
fn metrics(&self) -> Arc<MetricCounter> {
match self {
Client::Remote(client) => client.metrics(),
Client::Local(local_client) => local_client.metrics(),
}
}
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
match self {
Client::Remote(client) => {
let (c, d) = client.inner();
(c, Discard::Remote(d))
}
Client::Local(local_client) => {
let (c, d) = local_client.inner();
(c, Discard::Local(d))
}
}
}
}
impl Discard<'_> {
fn check_idle(&mut self, status: ReadyForQueryStatus) {
match self {
Discard::Remote(discard) => discard.check_idle(status),
Discard::Local(discard) => discard.check_idle(status),
}
}
fn discard(&mut self) {
match self {
Discard::Remote(discard) => discard.discard(),
Discard::Local(discard) => discard.discard(),
}
}
}

View File

@@ -129,7 +129,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
pub(crate) async fn serve_websocket(
config: &'static ProxyConfig,
auth_backend: &'static crate::auth::Backend<'static, ()>,
ctx: RequestMonitoring,
websocket: OnUpgrade,
cancellation_handler: Arc<CancellationHandlerMain>,
@@ -146,7 +145,6 @@ pub(crate) async fn serve_websocket(
let res = Box::pin(handle_client(
config,
auth_backend,
&ctx,
cancellation_handler,
WebSocketRw::new(websocket),

View File

@@ -27,7 +27,7 @@ use std::{
};
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, trace, warn};
use tracing::{error, info, instrument, trace};
use utils::backoff;
use uuid::{NoContext, Timestamp};
@@ -346,7 +346,7 @@ async fn collect_metrics_iteration(
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
// Report if the metric value is suspiciously large
warn!("potentially abnormal metric value: {:?}", metric);
error!("potentially abnormal metric value: {:?}", metric);
}
}
}

View File

@@ -15,20 +15,15 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
}
Ok(())
}
(
Scope::Admin
| Scope::PageServerApi
| Scope::GenerationsApi
| Scope::Infra
| Scope::Scrubber,
_,
) => Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Safekeeper auth",
claims.scope
)
.into(),
)),
(Scope::Admin | Scope::PageServerApi | Scope::GenerationsApi | Scope::Scrubber, _) => {
Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Safekeeper auth",
claims.scope
)
.into(),
))
}
(Scope::SafekeeperData, _) => Ok(()),
}
}

View File

@@ -636,7 +636,7 @@ async fn handle_tenant_list(
}
async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?;
check_permissions(&req, Scope::Admin)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
@@ -1182,7 +1182,7 @@ async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, Api
/// Assumes information is only relayed to storage controller after first selecting an unique id on
/// control plane database, which means we have an id field in the request and payload.
async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?;
check_permissions(&req, Scope::Admin)?;
let body = json_request::<SafekeeperPersistence>(&mut req).await?;
let id = parse_request_param::<i64>(&req, "id")?;

View File

@@ -246,11 +246,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
// storage controller's auth configuration.
ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}"))
}
mgmt_api::Error::ApiError(status @ StatusCode::TOO_MANY_REQUESTS, msg) => {
// Pass through 429 errors: if pageserver is asking us to wait + retry, we in
// turn ask our clients to wait + retry
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
}
mgmt_api::Error::ApiError(status, msg) => {
// Presume general case of pageserver API errors is that we tried to do something
// that can't be done right now.

View File

@@ -317,8 +317,9 @@ pub async fn scan_pageserver_metadata(
tenant_timeline_results.push((ttid, data));
}
let tenant_id = tenant_id.expect("Must be set if results are present");
if !tenant_timeline_results.is_empty() {
let tenant_id = tenant_id.expect("Must be set if results are present");
analyze_tenant(
&remote_client,
tenant_id,

View File

@@ -64,12 +64,10 @@ By default performance tests are excluded. To run them explicitly pass performan
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.
`COMPATIBILITY_NEON_BIN`: The directory where the previous version of Neon binaries can be found
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
Since pageserver supports several postgres versions, `POSTGRES_DISTRIB_DIR` must contain
a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`COMPATIBILITY_POSTGRES_DISTRIB_DIR`: The directory where the prevoius version of postgres distribution can be found.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
@@ -296,16 +294,6 @@ def test_foobar2(neon_env_builder: NeonEnvBuilder):
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
```
All the test which rely on NeonEnvBuilder, can check the various version combinations of the components.
To do this yuo may want to add the parametrize decorator with the function fixtures.utils.allpairs_versions()
E.g.
```python
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_something(
...
```
For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html
At the end of a test, all the nodes in the environment are automatically stopped, so you

View File

@@ -6,7 +6,6 @@ pytest_plugins = (
"fixtures.httpserver",
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -7,6 +7,7 @@ import json
import os
import re
import timeit
from collections.abc import Iterator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
@@ -24,8 +25,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
if TYPE_CHECKING:
from collections.abc import Iterator, Mapping
from typing import Callable, Optional
from typing import Callable, ClassVar, Optional
"""
@@ -141,28 +141,6 @@ class PgBenchRunResult:
)
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
#
# This used to be a class variable on PgBenchInitResult. However later versions
# of Python complain:
#
# ValueError: mutable default <class 'dict'> for field EXTRACTORS is not allowed: use default_factory
#
# When you do what the error tells you to do, it seems to fail our Python 3.9
# test environment. So let's just move it to a private module constant, and move
# on.
_PGBENCH_INIT_EXTRACTORS: Mapping[str, re.Pattern[str]] = {
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
}
@dataclasses.dataclass
class PgBenchInitResult:
total: Optional[float]
@@ -177,6 +155,20 @@ class PgBenchInitResult:
start_timestamp: int
end_timestamp: int
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
EXTRACTORS: ClassVar[dict[str, re.Pattern[str]]] = dataclasses.field(
default_factory=lambda: {
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
}
)
@classmethod
def parse_from_stderr(
cls,
@@ -193,7 +185,7 @@ class PgBenchInitResult:
timings: dict[str, Optional[float]] = {}
last_line_items = re.split(r"\(|\)|,", last_line)
for item in last_line_items:
for key, regex in _PGBENCH_INIT_EXTRACTORS.items():
for key, regex in cls.EXTRACTORS.items():
if (m := regex.match(item.strip())) is not None:
if key in timings:
raise RuntimeError(

View File

@@ -6,8 +6,6 @@ from enum import Enum
from functools import total_ordering
from typing import TYPE_CHECKING, TypeVar
from typing_extensions import override
if TYPE_CHECKING:
from typing import Any, Union
@@ -33,36 +31,33 @@ class Lsn:
self.lsn_int = (int(left, 16) << 32) + int(right, 16)
assert 0 <= self.lsn_int <= 0xFFFFFFFF_FFFFFFFF
@override
def __str__(self) -> str:
"""Convert lsn from int to standard hex notation."""
return f"{(self.lsn_int >> 32):X}/{(self.lsn_int & 0xFFFFFFFF):X}"
@override
def __repr__(self) -> str:
return f'Lsn("{str(self)}")'
def __int__(self) -> int:
return self.lsn_int
def __lt__(self, other: object) -> bool:
def __lt__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
return self.lsn_int < other.lsn_int
def __gt__(self, other: object) -> bool:
def __gt__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
raise NotImplementedError
return self.lsn_int > other.lsn_int
@override
def __eq__(self, other: object) -> bool:
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
return self.lsn_int == other.lsn_int
# Returns the difference between two Lsns, in bytes
def __sub__(self, other: object) -> int:
def __sub__(self, other: Any) -> int:
if not isinstance(other, Lsn):
return NotImplemented
return self.lsn_int - other.lsn_int
@@ -75,7 +70,6 @@ class Lsn:
else:
raise NotImplementedError
@override
def __hash__(self) -> int:
return hash(self.lsn_int)
@@ -122,22 +116,19 @@ class Id:
self.id = bytearray.fromhex(x)
assert len(self.id) == 16
@override
def __str__(self) -> str:
return self.id.hex()
def __lt__(self, other: object) -> bool:
def __lt__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self.id < other.id
@override
def __eq__(self, other: object) -> bool:
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self.id == other.id
@override
def __hash__(self) -> int:
return hash(str(self.id))
@@ -148,31 +139,25 @@ class Id:
class TenantId(Id):
@override
def __repr__(self) -> str:
return f'`TenantId("{self.id.hex()}")'
@override
def __str__(self) -> str:
return self.id.hex()
class NodeId(Id):
@override
def __repr__(self) -> str:
return f'`NodeId("{self.id.hex()}")'
@override
def __str__(self) -> str:
return self.id.hex()
class TimelineId(Id):
@override
def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")'
@override
def __str__(self) -> str:
return self.id.hex()
@@ -202,7 +187,7 @@ class TenantShardId:
assert self.shard_number < self.shard_count or self.shard_count == 0
@classmethod
def parse(cls: type[TTenantShardId], input: str) -> TTenantShardId:
def parse(cls: type[TTenantShardId], input) -> TTenantShardId:
if len(input) == 32:
return cls(
tenant_id=TenantId(input),
@@ -218,7 +203,6 @@ class TenantShardId:
else:
raise ValueError(f"Invalid TenantShardId '{input}'")
@override
def __str__(self):
if self.shard_count > 0:
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
@@ -226,25 +210,22 @@ class TenantShardId:
# Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id)
return str(self.tenant_id)
@override
def __repr__(self):
return self.__str__()
def _tuple(self) -> tuple[TenantId, int, int]:
return (self.tenant_id, self.shard_number, self.shard_count)
def __lt__(self, other: object) -> bool:
def __lt__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() < other._tuple()
@override
def __eq__(self, other: object) -> bool:
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() == other._tuple()
@override
def __hash__(self) -> int:
return hash(self._tuple())

View File

@@ -8,11 +8,9 @@ from contextlib import _GeneratorContextManager, contextmanager
# Type-related stuff
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from _pytest.fixtures import FixtureRequest
from typing_extensions import override
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
@@ -26,9 +24,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pg_stats import PgStatTable
if TYPE_CHECKING:
from collections.abc import Iterator
class PgCompare(ABC):
"""Common interface of all postgres implementations, useful for benchmarks.
@@ -70,12 +65,12 @@ class PgCompare(ABC):
@contextmanager
@abstractmethod
def record_pageserver_writes(self, out_name: str):
def record_pageserver_writes(self, out_name):
pass
@contextmanager
@abstractmethod
def record_duration(self, out_name: str):
def record_duration(self, out_name):
pass
@contextmanager
@@ -127,34 +122,28 @@ class NeonCompare(PgCompare):
self._pg = self.env.endpoints.create_start("main", "main", self.tenant)
@property
@override
def pg(self) -> PgProtocol:
return self._pg
@property
@override
def zenbenchmark(self) -> NeonBenchmarker:
return self._zenbenchmark
@property
@override
def pg_bin(self) -> PgBin:
return self._pg_bin
@override
def flush(self, compact: bool = True, gc: bool = True):
wait_for_last_flush_lsn(self.env, self._pg, self.tenant, self.timeline)
self.pageserver_http_client.timeline_checkpoint(self.tenant, self.timeline, compact=compact)
if gc:
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
@override
def compact(self):
self.pageserver_http_client.timeline_compact(
self.tenant, self.timeline, wait_until_uploaded=True
)
@override
def report_peak_memory_use(self):
self.zenbenchmark.record(
"peak_mem",
@@ -163,7 +152,6 @@ class NeonCompare(PgCompare):
report=MetricReport.LOWER_IS_BETTER,
)
@override
def report_size(self):
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.tenant, self.timeline
@@ -197,11 +185,9 @@ class NeonCompare(PgCompare):
"num_files_uploaded", total_files, "", report=MetricReport.LOWER_IS_BETTER
)
@override
def record_pageserver_writes(self, out_name: str) -> _GeneratorContextManager[None]:
return self.zenbenchmark.record_pageserver_writes(self.env.pageserver, out_name)
@override
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
return self.zenbenchmark.record_duration(out_name)
@@ -225,33 +211,26 @@ class VanillaCompare(PgCompare):
self.cur = self.conn.cursor()
@property
@override
def pg(self) -> VanillaPostgres:
return self._pg
@property
@override
def zenbenchmark(self) -> NeonBenchmarker:
return self._zenbenchmark
@property
@override
def pg_bin(self) -> PgBin:
return self._pg.pg_bin
@override
def flush(self, compact: bool = False, gc: bool = False):
self.cur.execute("checkpoint")
@override
def compact(self):
pass
@override
def report_peak_memory_use(self):
pass # TODO find something
@override
def report_size(self):
data_size = self.pg.get_subdir_size(Path("base"))
self.zenbenchmark.record(
@@ -266,7 +245,6 @@ class VanillaCompare(PgCompare):
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
yield # Do nothing
@override
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
return self.zenbenchmark.record_duration(out_name)
@@ -283,35 +261,28 @@ class RemoteCompare(PgCompare):
self.cur = self.conn.cursor()
@property
@override
def pg(self) -> PgProtocol:
return self._pg
@property
@override
def zenbenchmark(self) -> NeonBenchmarker:
return self._zenbenchmark
@property
@override
def pg_bin(self) -> PgBin:
return self._pg.pg_bin
@override
def flush(self, compact: bool = False, gc: bool = False):
def flush(self):
# TODO: flush the remote pageserver
pass
@override
def compact(self):
pass
@override
def report_peak_memory_use(self):
# TODO: get memory usage from remote pageserver
pass
@override
def report_size(self):
# TODO: get storage size from remote pageserver
pass
@@ -320,7 +291,6 @@ class RemoteCompare(PgCompare):
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
yield # Do nothing
@override
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
return self.zenbenchmark.record_duration(out_name)

View File

@@ -1,31 +1,27 @@
from __future__ import annotations
import concurrent.futures
from typing import TYPE_CHECKING
from typing import Any
import pytest
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.common_types import TenantId
from fixtures.log_helper import log
if TYPE_CHECKING:
from typing import Any, Callable, Optional
class ComputeReconfigure:
def __init__(self, server: HTTPServer):
def __init__(self, server):
self.server = server
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
self.workloads: dict[TenantId, Any] = {}
self.on_notify: Optional[Callable[[Any], None]] = None
self.workloads = {}
self.on_notify = None
def register_workload(self, workload: Any):
def register_workload(self, workload):
self.workloads[workload.tenant_id] = workload
def register_on_notify(self, fn: Optional[Callable[[Any], None]]):
def register_on_notify(self, fn):
"""
Add some extra work during a notification, like sleeping to slow things down, or
logging what was notified.
@@ -34,7 +30,7 @@ class ComputeReconfigure:
@pytest.fixture(scope="function")
def compute_reconfigure_listener(make_httpserver: HTTPServer):
def compute_reconfigure_listener(make_httpserver):
"""
This fixture exposes an HTTP listener for the storage controller to submit
compute notifications to us, instead of updating neon_local endpoints itself.
@@ -52,7 +48,7 @@ def compute_reconfigure_listener(make_httpserver: HTTPServer):
# accept a healthy rate of calls into notify-attach.
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def handler(request: Request) -> Response:
def handler(request: Request):
assert request.json is not None
body: dict[str, Any] = request.json
log.info(f"notify-attach request: {body}")

View File

@@ -14,10 +14,8 @@ from allure_pytest.utils import allure_name, allure_suite_labels
from fixtures.log_helper import log
if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import Any
"""
The plugin reruns flaky tests.
It uses `pytest.mark.flaky` provided by `pytest-rerunfailures` plugin and flaky tests detected by `scripts/flaky_tests.py`

View File

@@ -1,15 +1,8 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from pytest_httpserver import HTTPServer
if TYPE_CHECKING:
from collections.abc import Iterator
from fixtures.port_distributor import PortDistributor
# TODO: mypy fails with:
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
# from fixtures.neon_fixtures import PortDistributor
@@ -24,7 +17,7 @@ def httpserver_ssl_context():
@pytest.fixture(scope="function")
def make_httpserver(httpserver_listen_address, httpserver_ssl_context) -> Iterator[HTTPServer]:
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
host, port = httpserver_listen_address
if not host:
host = HTTPServer.DEFAULT_LISTEN_HOST
@@ -40,13 +33,13 @@ def make_httpserver(httpserver_listen_address, httpserver_ssl_context) -> Iterat
@pytest.fixture(scope="function")
def httpserver(make_httpserver: HTTPServer) -> Iterator[HTTPServer]:
def httpserver(make_httpserver):
server = make_httpserver
yield server
server.clear()
@pytest.fixture(scope="function")
def httpserver_listen_address(port_distributor: PortDistributor) -> tuple[str, int]:
def httpserver_listen_address(port_distributor) -> tuple[str, int]:
port = port_distributor.get_port()
return ("localhost", port)

View File

@@ -24,14 +24,25 @@ https://docs.pytest.org/en/6.2.x/logging.html
# log format is specified in pytest.ini file
LOGGING = {
"version": 1,
"filters": {
"wzfilter": {
"()": "fixtures.log_helper_internal.WerkzeugNoiseFilter",
},
},
"loggers": {
"root": {"level": "INFO"},
"root.safekeeper_async": {"level": "INFO"}, # a lot of logs on DEBUG level
# Use a custom filter to make werkzeug's messages less verbose.
"werkzeug": {
"filters": ["wzfilter"],
"level": "INFO",
},
},
}
def getLogger(name: str = "root") -> logging.Logger:
def getLogger(name="root") -> logging.Logger:
"""Method to get logger for tests.
Should be used to get correctly initialized logger."""

View File

@@ -0,0 +1,24 @@
# These are logically part of in log_helper.py, but need to be in a
# different file because these get loaded from the logging config
# file. If you try to included these in log_helper.py directly, you
# get an error about circular dependency.
import re
class WerkzeugNoiseFilter(object):
"""Moto server that we use for mocking S3 uses werkzeug, which
logs all HTTP operations. It constructs log messages like this:
127.0.0.1 - - [08/Oct/2024 12:43:46] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -
The IP address is not interesting in tests, as it's always just
127.0.0.1. And the timestamp is redundant with the timestamp we
print for all log messages anyway, with millisecond precision.
Unfortunately those are "etched" in the message, and cannot be
overriden by setting a custom formatter. To reduce the noise in
the test output, this filter removes those fields from the log
messages.
"""
def filter(self, logRecord):
logRecord.msg = re.sub(r'127\.0\.0\.1 - - \[.+\] (".*".*)', r'\1', logRecord.msg)
return True

View File

@@ -22,7 +22,7 @@ class Metrics:
def query_all(self, name: str, filter: Optional[dict[str, str]] = None) -> list[Sample]:
filter = filter or {}
res: list[Sample] = []
res = []
for sample in self.metrics[name]:
try:
@@ -59,7 +59,7 @@ class MetricsGetter:
return results[0].value
def get_metrics_values(
self, names: list[str], filter: Optional[dict[str, str]] = None, absence_ok: bool = False
self, names: list[str], filter: Optional[dict[str, str]] = None, absence_ok=False
) -> dict[str, float]:
"""
When fetching multiple named metrics, it is more efficient to use this

View File

@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, cast
import requests
if TYPE_CHECKING:
from typing import Any, Literal, Optional
from typing import Any, Literal, Optional, Union
from fixtures.pg_version import PgVersion
@@ -25,7 +25,9 @@ class NeonAPI:
self.__neon_api_key = neon_api_key
self.__neon_api_base_url = neon_api_base_url.strip("/")
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
def __request(
self, method: Union[str, bytes], endpoint: str, **kwargs: Any
) -> requests.Response:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"

View File

@@ -18,6 +18,7 @@ from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from fcntl import LOCK_EX, LOCK_UN, flock
from functools import cached_property
from pathlib import Path
from types import TracebackType
@@ -58,7 +59,6 @@ from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
)
from fixtures.paths import get_test_repo_dir, shared_snapshot_dir
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
@@ -75,8 +75,8 @@ from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
COMPONENT_BINARIES,
allure_add_grafana_links,
allure_attach_from_dir,
assert_no_errors,
get_dir_size,
print_gc_result,
@@ -96,8 +96,6 @@ if TYPE_CHECKING:
Union,
)
from fixtures.paths import SnapshotDirLocked
T = TypeVar("T")
@@ -120,11 +118,65 @@ put directly-importable functions into utils.py or another separate file.
Env = dict[str, str]
DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = Path(__file__).parents[2]
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
@@ -317,14 +369,11 @@ class NeonEnvBuilder:
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
neon_binpath: Path,
compatibility_neon_binpath: Path,
pg_distrib_dir: Path,
compatibility_pg_distrib_dir: Path,
pg_version: PgVersion,
test_name: str,
top_output_dir: Path,
test_output_dir: Path,
combination,
test_overlay_dir: Optional[Path] = None,
pageserver_remote_storage: Optional[RemoteStorage] = None,
# toml that will be decomposed into `--config-override` flags during `pageserver --init`
@@ -406,19 +455,6 @@ class NeonEnvBuilder:
"test_"
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
self.compatibility_neon_binpath = compatibility_neon_binpath
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
self.version_combination = combination
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
assert (
self.compatibility_neon_binpath is not None
), "the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
assert (
self.compatibility_pg_distrib_dir is not None
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
self.mixdir.mkdir(mode=0o755, exist_ok=True)
self._mix_versions()
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -619,21 +655,6 @@ class NeonEnvBuilder:
return self.env
def _mix_versions(self):
assert self.version_combination is not None, "version combination must be set"
for component, paths in COMPONENT_BINARIES.items():
directory = (
self.neon_binpath
if self.version_combination[component] == "new"
else self.compatibility_neon_binpath
)
for filename in paths:
destination = self.mixdir / filename
destination.symlink_to(directory / filename)
if self.version_combination["compute"] == "old":
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
self.neon_binpath = self.mixdir
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
"""
Mount `srcdir` as an overlayfs mount at `dstdir`.
@@ -1382,9 +1403,7 @@ def neon_simple_env(
top_output_dir: Path,
test_output_dir: Path,
neon_binpath: Path,
compatibility_neon_binpath: Path,
pg_distrib_dir: Path,
compatibility_pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
@@ -1399,11 +1418,6 @@ def neon_simple_env(
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir)
combination = (
request._pyfuncitem.callspec.params["combination"]
if "combination" in request._pyfuncitem.callspec.params
else None
)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
@@ -1411,9 +1425,7 @@ def neon_simple_env(
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
compatibility_neon_binpath=compatibility_neon_binpath,
pg_distrib_dir=pg_distrib_dir,
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
@@ -1423,7 +1435,6 @@ def neon_simple_env(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
combination=combination,
) as builder:
env = builder.init_start()
@@ -1437,9 +1448,7 @@ def neon_env_builder(
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
compatibility_neon_binpath: Path,
pg_distrib_dir: Path,
compatibility_pg_distrib_dir: Path,
pg_version: PgVersion,
run_id: uuid.UUID,
request: FixtureRequest,
@@ -1466,11 +1475,6 @@ def neon_env_builder(
# Create the environment in the test-specific output dir
repo_dir = os.path.join(test_output_dir, "repo")
combination = (
request._pyfuncitem.callspec.params["combination"]
if "combination" in request._pyfuncitem.callspec.params
else None
)
# Return the builder to the caller
with NeonEnvBuilder(
@@ -1479,10 +1483,7 @@ def neon_env_builder(
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
compatibility_neon_binpath=compatibility_neon_binpath,
pg_distrib_dir=pg_distrib_dir,
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
combination=combination,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
@@ -1986,11 +1987,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"reconcile_all waited for {n} shards")
return n
def reconcile_until_idle(self, timeout_secs=30, max_interval=5):
def reconcile_until_idle(self, timeout_secs=30):
start_at = time.time()
n = 1
delay_sec = 0.1
delay_max = max_interval
delay_sec = 0.5
delay_max = 5
while n > 0:
n = self.reconcile_all()
if n == 0:
@@ -3656,7 +3657,7 @@ class Endpoint(PgProtocol, LogUtils):
config_lines: Optional[list[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple: bool = False,
allow_multiple=False,
basebackup_request_tries: Optional[int] = None,
) -> Endpoint:
"""
@@ -3997,7 +3998,7 @@ class Safekeeper(LogUtils):
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
# list partial uploaded segments of this safekeeper. Works only for
# List partial uploaded segments of this safekeeper. Works only for
# RemoteStorageKind.LOCAL_FS.
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
@@ -4245,6 +4246,44 @@ class StorageScrubber:
raise
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
"""Compute the path to a working directory for an individual test."""
test_name = request.node.name
test_dir = top_output_dir / f"{prefix}{test_name.replace('/', '-')}"
# We rerun flaky tests multiple times, use a separate directory for each run.
if (suffix := getattr(request.node, "execution_count", None)) is not None:
test_dir = test_dir.parent / f"{test_dir.name}-{suffix}"
log.info(f"get_test_output_dir is {test_dir}")
# make mypy happy
assert isinstance(test_dir, Path)
return test_dir
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
The working directory for a test.
"""
return _get_test_dir(request, top_output_dir, "")
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return _get_test_dir(request, top_output_dir, "overlay-")
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
def pytest_addoption(parser: Parser):
parser.addoption(
"--preserve-database-files",
@@ -4254,11 +4293,154 @@ def pytest_addoption(parser: Parser):
)
SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
)
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="function", autouse=True)
def test_output_dir(
request: FixtureRequest, top_output_dir: Path, test_overlay_dir: Path
) -> Iterator[Path]:
"""Create the working directory for an individual test."""
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
class FileAndThreadLock:
def __init__(self, path: Path):
self.path = path
self.thread_lock = threading.Lock()
self.fd: Optional[int] = None
def __enter__(self):
self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
# lock thread lock before file lock so that there's no race
# around flocking / funlocking the file lock
self.thread_lock.acquire()
flock(self.fd, LOCK_EX)
def __exit__(self, exc_type, exc_value, exc_traceback):
assert self.fd is not None
assert self.thread_lock.locked() # ... by us
flock(self.fd, LOCK_UN)
self.thread_lock.release()
os.close(self.fd)
self.fd = None
class SnapshotDirLocked:
def __init__(self, parent: SnapshotDir):
self._parent = parent
def is_initialized(self):
# TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized.
# Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed.
return self._parent._marker_file_path.exists()
def set_initialized(self):
self._parent._marker_file_path.write_text("")
@property
def path(self) -> Path:
return self._parent._path / "snapshot"
class SnapshotDir:
_path: Path
def __init__(self, path: Path):
self._path = path
assert self._path.is_dir()
self._lock = FileAndThreadLock(self._lock_file_path)
@property
def _lock_file_path(self) -> Path:
return self._path / "initializing.flock"
@property
def _marker_file_path(self) -> Path:
return self._path / "initialized.marker"
def __enter__(self) -> SnapshotDirLocked:
self._lock.__enter__()
return SnapshotDirLocked(self)
def __exit__(self, exc_type, exc_value, exc_traceback):
self._lock.__exit__(exc_type, exc_value, exc_traceback)
def shared_snapshot_dir(top_output_dir, ident: str) -> SnapshotDir:
snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident)
snapshot_dir_path.mkdir(exist_ok=True, parents=True)
return SnapshotDir(snapshot_dir_path)
@pytest.fixture(scope="function")
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run
SKIP_DIRS = frozenset(
(
"pg_wal",
@@ -4280,7 +4462,6 @@ SKIP_FILES = frozenset(
"postmaster.opts",
"postmaster.pid",
"pg_control",
"pg_dynshmem",
)
)

View File

@@ -1,13 +1,10 @@
from __future__ import annotations
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING
import psutil
if TYPE_CHECKING:
from collections.abc import Iterator
def iter_mounts_beneath(topdir: Path) -> Iterator[Path]:
"""

View File

@@ -886,7 +886,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
batch_size: Optional[int] = None,
batch_size: int | None = None,
**kwargs,
) -> set[TimelineId]:
params = {}

View File

@@ -9,12 +9,7 @@ import toml
from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from typing import Any, Optional
from fixtures.utils import AuxFileStore
from fixtures.utils import AuxFileStore
if TYPE_CHECKING:
from typing import Any, Optional

View File

@@ -1,312 +0,0 @@
from __future__ import annotations
import os
import shutil
import subprocess
import threading
from fcntl import LOCK_EX, LOCK_UN, flock
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING
import pytest
from pytest import FixtureRequest
from fixtures import overlayfs
from fixtures.log_helper import log
from fixtures.utils import allure_attach_from_dir
if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Optional
DEFAULT_OUTPUT_DIR: str = "test_output"
def get_test_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
"""Compute the path to a working directory for an individual test."""
test_name = request.node.name
test_dir = top_output_dir / f"{prefix or ''}{test_name.replace('/', '-')}"
# We rerun flaky tests multiple times, use a separate directory for each run.
if (suffix := getattr(request.node, "execution_count", None)) is not None:
test_dir = test_dir.parent / f"{test_dir.name}-{suffix}"
return test_dir
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
The working directory for a test.
"""
return get_test_dir(request, top_output_dir)
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return get_test_dir(request, top_output_dir, "overlay-")
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = Path(__file__).parents[2]
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="session")
def compute_config_dir(base_dir: Path) -> Iterator[Path]:
"""
Retrieve the path to the compute configuration directory.
"""
yield base_dir / "compute" / "etc"
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath.absolute()
@pytest.fixture(scope="session")
def compatibility_snapshot_dir() -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
return
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
yield compatibility_snapshot_dir
@pytest.fixture(scope="session")
def compatibility_neon_binpath() -> Optional[Iterator[Path]]:
if os.getenv("REMOTE_ENV"):
return
comp_binpath = None
if env_compatibility_neon_binpath := os.environ.get("COMPATIBILITY_NEON_BIN"):
comp_binpath = Path(env_compatibility_neon_binpath).resolve().absolute()
yield comp_binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def compatibility_pg_distrib_dir() -> Optional[Iterator[Path]]:
compat_distrib_dir = None
if env_compat_postgres_bin := os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR"):
compat_distrib_dir = Path(env_compat_postgres_bin).resolve()
if not compat_distrib_dir.exists():
raise Exception(f"compatibility postgres directory not found at {compat_distrib_dir}")
if compat_distrib_dir:
log.info(f"compatibility_pg_distrib_dir is {compat_distrib_dir}")
yield compat_distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="function", autouse=True)
def test_output_dir(request: pytest.FixtureRequest, top_output_dir: Path) -> Iterator[Path]:
"""Create the working directory for an individual test."""
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
class FileAndThreadLock:
def __init__(self, path: Path):
self.path = path
self.thread_lock = threading.Lock()
self.fd: Optional[int] = None
def __enter__(self):
self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
# lock thread lock before file lock so that there's no race
# around flocking / funlocking the file lock
self.thread_lock.acquire()
flock(self.fd, LOCK_EX)
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
):
assert self.fd is not None
assert self.thread_lock.locked() # ... by us
flock(self.fd, LOCK_UN)
self.thread_lock.release()
os.close(self.fd)
self.fd = None
class SnapshotDirLocked:
def __init__(self, parent: SnapshotDir):
self._parent = parent
def is_initialized(self):
# TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized.
# Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed.
return self._parent.marker_file_path.exists()
def set_initialized(self):
self._parent.marker_file_path.write_text("")
@property
def path(self) -> Path:
return self._parent.path / "snapshot"
class SnapshotDir:
_path: Path
def __init__(self, path: Path):
self._path = path
assert self._path.is_dir()
self._lock = FileAndThreadLock(self.lock_file_path)
@property
def path(self) -> Path:
return self._path
@property
def lock_file_path(self) -> Path:
return self._path / "initializing.flock"
@property
def marker_file_path(self) -> Path:
return self._path / "initialized.marker"
def __enter__(self) -> SnapshotDirLocked:
self._lock.__enter__()
return SnapshotDirLocked(self)
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
):
self._lock.__exit__(exc_type, exc_value, exc_traceback)
def shared_snapshot_dir(top_output_dir: Path, ident: str) -> SnapshotDir:
snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident)
snapshot_dir_path.mkdir(exist_ok=True, parents=True)
return SnapshotDir(snapshot_dir_path)
@pytest.fixture(scope="function")
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run

View File

@@ -2,14 +2,9 @@ from __future__ import annotations
import enum
import os
from typing import TYPE_CHECKING
from typing import Optional
import pytest
from typing_extensions import override
if TYPE_CHECKING:
from typing import Optional
"""
This fixture is used to determine which version of Postgres to use for tests.
@@ -29,12 +24,10 @@ class PgVersion(str, enum.Enum):
NOT_SET = "<-POSTRGRES VERSION IS NOT SET->"
# Make it less confusing in logs
@override
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
@override
def __str__(self) -> str:
return self.value
@@ -45,8 +38,7 @@ class PgVersion(str, enum.Enum):
return f"v{self.value}"
@classmethod
@override
def _missing_(cls, value: object) -> Optional[PgVersion]:
def _missing_(cls, value) -> Optional[PgVersion]:
known_values = {v.value for _, v in cls.__members__.items()}
# Allow passing version as a string with "v" prefix (e.g. "v14")

View File

@@ -59,7 +59,10 @@ class PortDistributor:
if isinstance(value, int):
return self._replace_port_int(value)
return self._replace_port_str(value)
if isinstance(value, str):
return self._replace_port_str(value)
raise TypeError(f"unsupported type {type(value)} of {value=}")
def _replace_port_int(self, value: int) -> int:
known_port = self.port_map.get(value)
@@ -72,7 +75,7 @@ class PortDistributor:
# Use regex to find port in a string
# urllib.parse.urlparse produces inconvenient results for cases without scheme like "localhost:5432"
# See https://bugs.python.org/issue27657
ports: list[str] = re.findall(r":(\d+)(?:/|$)", value)
ports = re.findall(r":(\d+)(?:/|$)", value)
assert len(ports) == 1, f"can't find port in {value}"
port_int = int(ports[0])

View File

@@ -13,7 +13,6 @@ import boto3
import toml
from moto.server import ThreadedMotoServer
from mypy_boto3_s3 import S3Client
from typing_extensions import override
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
@@ -37,7 +36,6 @@ class RemoteStorageUser(str, enum.Enum):
EXTENSIONS = "ext"
SAFEKEEPER = "safekeeper"
@override
def __str__(self) -> str:
return self.value
@@ -83,13 +81,11 @@ class LocalFsStorage:
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
def timeline_latest_generation(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> Optional[int]:
def timeline_latest_generation(self, tenant_id, timeline_id):
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
index_parts = [f for f in timeline_files if f.startswith("index_part")]
def parse_gen(filename: str) -> Optional[int]:
def parse_gen(filename):
log.info(f"parsing index_part '{filename}'")
parts = filename.split("-")
if len(parts) == 2:
@@ -97,7 +93,7 @@ class LocalFsStorage:
else:
return None
generations = sorted([parse_gen(f) for f in index_parts]) # type: ignore
generations = sorted([parse_gen(f) for f in index_parts])
if len(generations) == 0:
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
return generations[-1]
@@ -126,14 +122,14 @@ class LocalFsStorage:
filename = f"{local_name}-{generation:08x}"
return self.timeline_path(tenant_id, timeline_id) / filename
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)
def heatmap_path(self, tenant_id: TenantId) -> Path:
return self.tenant_path(tenant_id) / TENANT_HEATMAP_FILE_NAME
def heatmap_content(self, tenant_id: TenantId) -> Any:
def heatmap_content(self, tenant_id):
with self.heatmap_path(tenant_id).open("r") as f:
return json.load(f)
@@ -301,7 +297,7 @@ class S3Storage:
def heatmap_key(self, tenant_id: TenantId) -> str:
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
def heatmap_content(self, tenant_id: TenantId) -> Any:
def heatmap_content(self, tenant_id: TenantId):
r = self.client.get_object(Bucket=self.bucket_name, Key=self.heatmap_key(tenant_id))
return json.loads(r["Body"].read().decode("utf-8"))
@@ -321,7 +317,7 @@ class RemoteStorageKind(str, enum.Enum):
def configure(
self,
repo_dir: Path,
mock_s3_server: MockS3Server,
mock_s3_server,
run_id: str,
test_name: str,
user: RemoteStorageUser,
@@ -455,9 +451,15 @@ def default_remote_storage() -> RemoteStorageKind:
def remote_storage_to_toml_dict(remote_storage: RemoteStorage) -> dict[str, Any]:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
raise Exception("invalid remote storage type")
return remote_storage.to_toml_dict()
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
raise Exception("invalid remote storage type")
return remote_storage.to_toml_inline_table()

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from typing import Any, Optional
import pytest
import requests
@@ -12,9 +12,6 @@ from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
if TYPE_CHECKING:
from typing import Any, Optional
class StorageControllerProxy:
def __init__(self, server: HTTPServer):
@@ -37,7 +34,7 @@ def proxy_request(method: str, url: str, **kwargs) -> requests.Response:
@pytest.fixture(scope="function")
def storage_controller_proxy(make_httpserver: HTTPServer):
def storage_controller_proxy(make_httpserver):
"""
Proxies requests into the storage controller to the currently
selected storage controller instance via `StorageControllerProxy.route_to`.
@@ -51,7 +48,7 @@ def storage_controller_proxy(make_httpserver: HTTPServer):
log.info(f"Storage controller proxy listening on {self.listen}")
def handler(request: Request) -> Response:
def handler(request: Request):
if self.route_to is None:
log.info(f"Storage controller proxy has no routing configured for {request.url}")
return Response("Routing not configured", status=503)

View File

@@ -18,7 +18,6 @@ from urllib.parse import urlencode
import allure
import zstandard
from psycopg2.extensions import cursor
from typing_extensions import override
from fixtures.log_helper import log
from fixtures.pageserver.common_types import (
@@ -27,45 +26,28 @@ from fixtures.pageserver.common_types import (
)
if TYPE_CHECKING:
from collections.abc import Iterable
from typing import IO, Optional
from typing import (
IO,
Optional,
Union,
)
from fixtures.common_types import TimelineId
from fixtures.neon_fixtures import PgBin
WaitUntilRet = TypeVar("WaitUntilRet")
from fixtures.common_types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
COMPONENT_BINARIES = {
"storage_controller": ("storage_controller",),
"storage_broker": ("storage_broker",),
"compute": ("compute_ctl",),
"safekeeper": ("safekeeper",),
"pageserver": ("pageserver", "pagectl"),
}
# Disable auto-formatting for better readability
# fmt: off
VERSIONS_COMBINATIONS = (
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
)
# fmt: on
def subprocess_capture(
capture_dir: Path,
cmd: list[str],
*,
check: bool = False,
echo_stderr: bool = False,
echo_stdout: bool = False,
capture_stdout: bool = False,
timeout: Optional[float] = None,
with_command_header: bool = True,
check=False,
echo_stderr=False,
echo_stdout=False,
capture_stdout=False,
timeout=None,
with_command_header=True,
**popen_kwargs: Any,
) -> tuple[str, Optional[str], int]:
"""Run a process and bifurcate its output to files and the `log` logger
@@ -102,7 +84,6 @@ def subprocess_capture(
self.capture = capture
self.captured = ""
@override
def run(self):
first = with_command_header
for line in self.in_file:
@@ -184,10 +165,10 @@ def global_counter() -> int:
def print_gc_result(row: dict[str, Any]):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
).format_map(row)
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}".format_map(
row
)
)
@@ -245,7 +226,7 @@ def get_scale_for_db(size_mb: int) -> int:
return round(0.06689 * size_mb - 0.5)
ATTACHMENT_NAME_REGEX: re.Pattern[str] = re.compile(
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"regression\.(diffs|out)|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)"
)
@@ -308,7 +289,7 @@ LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
links: dict[str, str] = {}
links = {}
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
@@ -360,7 +341,7 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
def start_in_background(
command: list[str], cwd: Path, log_file_name: str, is_started: Callable[[], WaitUntilRet]
command: list[str], cwd: Path, log_file_name: str, is_started: Fn
) -> subprocess.Popen[bytes]:
"""Starts a process, creates the logfile and redirects stderr and stdout there. Runs the start checks before the process is started, or errors."""
@@ -395,11 +376,14 @@ def start_in_background(
return spawned_process
WaitUntilRet = TypeVar("WaitUntilRet")
def wait_until(
number_of_iterations: int,
interval: float,
func: Callable[[], WaitUntilRet],
show_intermediate_error: bool = False,
show_intermediate_error=False,
) -> WaitUntilRet:
"""
Wait until 'func' returns successfully, without exception. Returns the
@@ -480,7 +464,7 @@ def humantime_to_ms(humantime: str) -> float:
def scan_log_for_errors(input: Iterable[str], allowed_errors: list[str]) -> list[tuple[int, str]]:
# FIXME: this duplicates test_runner/fixtures/pageserver/allowed_errors.py
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors: list[tuple[int, str]] = []
errors = []
for lineno, line in enumerate(input, start=1):
if len(line) == 0:
continue
@@ -500,7 +484,7 @@ def scan_log_for_errors(input: Iterable[str], allowed_errors: list[str]) -> list
return errors
def assert_no_errors(log_file: Path, service: str, allowed_errors: list[str]):
def assert_no_errors(log_file, service, allowed_errors):
if not log_file.exists():
log.warning(f"Skipping {service} log check: {log_file} does not exist")
return
@@ -520,11 +504,9 @@ class AuxFileStore(str, enum.Enum):
V2 = "v2"
CrossValidation = "cross-validation"
@override
def __repr__(self) -> str:
return f"'aux-{self.value}'"
@override
def __str__(self) -> str:
return f"'aux-{self.value}'"
@@ -543,7 +525,7 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
"""
started_at = time.time()
def hash_extracted(reader: Optional[IO[bytes]]) -> bytes:
def hash_extracted(reader: Union[IO[bytes], None]) -> bytes:
assert reader is not None
digest = sha256(usedforsecurity=False)
while True:
@@ -568,7 +550,7 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
right_list
), f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
mismatching: set[str] = set()
mismatching = set()
for left_tuple, right_tuple in zip(left_list, right_list):
left_path, left_hash = left_tuple
@@ -593,7 +575,6 @@ class PropagatingThread(threading.Thread):
Simple Thread wrapper with join() propagating the possible exception in the thread.
"""
@override
def run(self):
self.exc = None
try:
@@ -601,8 +582,7 @@ class PropagatingThread(threading.Thread):
except BaseException as e:
self.exc = e
@override
def join(self, timeout: Optional[float] = None) -> Any:
def join(self, timeout=None):
super().join(timeout)
if self.exc:
raise self.exc
@@ -624,19 +604,3 @@ def human_bytes(amt: float) -> str:
amt = amt / 1024
raise RuntimeError("unreachable")
def allpairs_versions():
"""
Returns a dictionary with arguments for pytest parametrize
to test the compatibility with the previous version of Neon components
combinations were pre-computed to test all the pairs of the components with
the different versions.
"""
ids = []
for pair in VERSIONS_COMBINATIONS:
cur_id = []
for component in sorted(pair.keys()):
cur_id.append(pair[component][0])
ids.append(f"combination_{''.join(cur_id)}")
return {"argnames": "combination", "argvalues": VERSIONS_COMBINATIONS, "ids": ids}

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import threading
from typing import TYPE_CHECKING
from typing import Any, Optional
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
@@ -14,9 +14,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.utils import wait_for_last_record_lsn
if TYPE_CHECKING:
from typing import Any, Optional
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
ENDPOINT_LOCK = threading.Lock()
@@ -103,7 +100,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n: int, pageserver_id: Optional[int] = None, upload: bool = True):
def write_rows(self, n, pageserver_id: Optional[int] = None, upload: bool = True):
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
@@ -124,9 +121,7 @@ class Workload:
else:
return False
def churn_rows(
self, n: int, pageserver_id: Optional[int] = None, upload: bool = True, ingest: bool = True
):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
assert self.expect_rows >= n
max_iters = 10

View File

@@ -4,10 +4,9 @@ import concurrent.futures
import random
import time
from collections import defaultdict
from enum import Enum
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -35,7 +34,6 @@ def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[
if tenant_placement[tid]["intent"]["attached"]
== tenant_placement[tid]["observed"]["attached"]
}
assert len(matching) == total_shards
attached_per_node: defaultdict[str, int] = defaultdict(int)
@@ -109,48 +107,15 @@ def test_storage_controller_many_tenants(
ps.allowed_errors.append(".*request was dropped before completing.*")
# Total tenants
small_tenant_count = 7800
large_tenant_count = 200
tenant_count = small_tenant_count + large_tenant_count
large_tenant_shard_count = 8
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
tenant_count = 4000
# A small stripe size to encourage all shards to get some data
stripe_size = 1
# Shards per tenant
shard_count = 2
stripe_size = 1024
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
rng = random.Random(1234)
total_shards = tenant_count * shard_count
class Tenant:
def __init__(self):
# Tenants may optionally contain a timeline
self.timeline_id = None
# Tenants may be marked as 'large' to get multiple shard during creation phase
self.large = False
tenant_ids = list(TenantId.generate() for _i in range(0, tenant_count))
tenants = dict((tid, Tenant()) for tid in tenant_ids)
# We will create timelines in only a subset of tenants, because creating timelines
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
# a single test node.
tenant_timelines_count = 100
# These lists are maintained for use with rng.choice
tenants_with_timelines = list(rng.sample(tenants.keys(), tenant_timelines_count))
tenants_without_timelines = list(
tenant_id for tenant_id in tenants if tenant_id not in tenants_with_timelines
)
# For our sharded tenants, we will make half of them with timelines and half without
assert large_tenant_count >= tenant_timelines_count / 2
for tenant_id in tenants_with_timelines[0 : large_tenant_count // 2]:
tenants[tenant_id].large = True
for tenant_id in tenants_without_timelines[0 : large_tenant_count // 2]:
tenants[tenant_id].large = True
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -160,39 +125,23 @@ def test_storage_controller_many_tenants(
rss = env.storage_controller.get_metric_value("process_resident_memory_bytes")
assert rss is not None
log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)")
assert rss < expect_memory_per_shard * total_shards
log.info(f"Resident memory: {rss} ({ rss / (shard_count * tenant_count)} per shard)")
assert rss < expect_memory_per_shard * shard_count * tenant_count
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
rng = random.Random(1234)
# Issue more concurrent operations than the storage controller's reconciler concurrency semaphore
# permits, to ensure that we are exercising stressing that.
api_concurrency = 135
# A different concurrency limit for bulk tenant+timeline creations: these do I/O and will
# start timing on test nodes if we aren't a bit careful.
create_concurrency = 16
class Operation(str, Enum):
TIMELINE_OPS = "timeline_ops"
SHARD_MIGRATE = "shard_migrate"
TENANT_PASSTHROUGH = "tenant_passthrough"
run_ops = api_concurrency * 4
assert run_ops < len(tenants)
# Creation phase: make a lot of tenants, and create timelines in a subset of them
# This executor has concurrency set modestly, to avoid overloading pageservers with timeline creations.
with concurrent.futures.ThreadPoolExecutor(max_workers=create_concurrency) as executor:
tenant_create_futs = []
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
futs = []
t1 = time.time()
for tenant_id, tenant in tenants.items():
if tenant.large:
shard_count = large_tenant_shard_count
else:
shard_count = 1
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
for tenant_id in tenants:
f = executor.submit(
env.storage_controller.tenant_create,
tenant_id,
@@ -203,106 +152,44 @@ def test_storage_controller_many_tenants(
tenant_config={"heatmap_period": "10s"},
placement_policy={"Attached": 1},
)
tenant_create_futs.append(f)
futs.append(f)
# Wait for tenant creations to finish
for f in tenant_create_futs:
# Wait for creations to finish
for f in futs:
f.result()
log.info(
f"Created {len(tenants)} tenants in {time.time() - t1}, {len(tenants) / (time.time() - t1)}/s"
)
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
# would be for original scheduling decisions to always match optimizer's preference)
# (workaround for https://github.com/neondatabase/neon/issues/8969)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
# Create timelines in those tenants which are going to get one
t1 = time.time()
timeline_create_futs = []
for tenant_id in tenants_with_timelines:
timeline_id = TimelineId.generate()
tenants[tenant_id].timeline_id = timeline_id
f = executor.submit(
env.storage_controller.pageserver_api().timeline_create,
PgVersion.NOT_SET,
tenant_id,
timeline_id,
)
timeline_create_futs.append(f)
for f in timeline_create_futs:
f.result()
log.info(
f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s"
)
# Plan operations: ensure each tenant with a timeline gets at least
# one of each operation type. Then add other tenants to make up the
# numbers.
ops_plan = []
for tenant_id in tenants_with_timelines:
ops_plan.append((tenant_id, Operation.TIMELINE_OPS))
ops_plan.append((tenant_id, Operation.SHARD_MIGRATE))
ops_plan.append((tenant_id, Operation.TENANT_PASSTHROUGH))
# Fill up remaining run_ops with migrations of tenants without timelines
other_migrate_tenants = rng.sample(tenants_without_timelines, run_ops - len(ops_plan))
for tenant_id in other_migrate_tenants:
ops_plan.append(
(
tenant_id,
rng.choice([Operation.SHARD_MIGRATE, Operation.TENANT_PASSTHROUGH]),
)
)
# Exercise phase: pick pseudo-random operations to do on the tenants + timelines
# This executor has concurrency high enough to stress the storage controller API.
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
def exercise_timeline_ops(tenant_id, timeline_id):
# A read operation: this requires looking up shard zero and routing there
detail = virtual_ps_http.timeline_detail(tenant_id, timeline_id)
assert detail["timeline_id"] == str(timeline_id)
# A fan-out write operation to all shards in a tenant.
# - We use a metadata operation rather than something like a timeline create, because
# timeline creations are I/O intensive and this test isn't meant to be a stress test for
# doing lots of concurrent timeline creations.
archival_state = rng.choice(
[TimelineArchivalState.ARCHIVED, TimelineArchivalState.UNARCHIVED]
)
virtual_ps_http.timeline_archival_config(tenant_id, timeline_id, archival_state)
run_ops = api_concurrency * 4
assert run_ops < len(tenants)
op_tenants = list(tenants)[0:run_ops]
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id, op in ops_plan:
if op == Operation.TIMELINE_OPS:
op_timeline_id = tenants[tenant_id].timeline_id
assert op_timeline_id is not None
# Exercise operations that modify tenant scheduling state but require traversing
# the fan-out-to-all-shards functionality.
for tenant_id in op_tenants:
op = rng.choice([0, 1, 2])
if op == 0:
# A fan-out write operation to all shards in a tenant (timeline creation)
f = executor.submit(
exercise_timeline_ops,
virtual_ps_http.timeline_create,
PgVersion.NOT_SET,
tenant_id,
op_timeline_id,
TimelineId.generate(),
)
elif op == Operation.SHARD_MIGRATE:
elif op == 1:
# A reconciler operation: migrate a shard.
desc = env.storage_controller.tenant_describe(tenant_id)
shard_number = rng.randint(0, len(desc["shards"]) - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, len(desc["shards"]))
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
# Migrate it to its secondary location
desc = env.storage_controller.tenant_describe(tenant_id)
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
elif op == Operation.TENANT_PASSTHROUGH:
elif op == 2:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
@@ -312,18 +199,10 @@ def test_storage_controller_many_tenants(
for f in futs:
f.result()
log.info("Completed mixed operations phase")
# Some of the operations above (notably migrations) might leave the controller in a state where it has
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
# to reach a quiescent state before doing following checks.
#
# - Set max_interval low because we probably have a significant number of optimizations to complete and would like
# the test to run quickly.
# - Set timeout high because we might be waiting for optimizations that reuqire a secondary
# to warm up, and if we just started a secondary in the previous step, it might wait some time
# before downloading its heatmap
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
check_memory()
@@ -334,7 +213,6 @@ def test_storage_controller_many_tenants(
#
# We do not require that the system is quiescent already here, although at present in this point in the test
# that may be the case.
log.info("Reconciling all & timing")
while True:
t1 = time.time()
reconcilers = env.storage_controller.reconcile_all()
@@ -347,7 +225,6 @@ def test_storage_controller_many_tenants(
break
# Restart the storage controller
log.info("Restarting controller")
env.storage_controller.stop()
env.storage_controller.start()
@@ -369,16 +246,7 @@ def test_storage_controller_many_tenants(
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
log.info("Restarting pageservers...")
# Parameters for how long we expect it to take to migrate all of the tenants from/to
# a node during a drain/fill operation
DRAIN_FILL_TIMEOUT = 240
DRAIN_FILL_BACKOFF = 5
for ps in env.pageservers:
log.info(f"Draining pageserver {ps.id}")
t1 = time.time()
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
@@ -387,10 +255,9 @@ def test_storage_controller_many_tenants(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
backoff=DRAIN_FILL_BACKOFF,
max_attempts=24,
backoff=5,
)
log.info(f"Drained pageserver {ps.id} in {time.time() - t1}s")
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
@@ -408,7 +275,6 @@ def test_storage_controller_many_tenants(
backoff=1,
)
log.info(f"Filling pageserver {ps.id}")
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
@@ -416,23 +282,16 @@ def test_storage_controller_many_tenants(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
backoff=DRAIN_FILL_BACKOFF,
max_attempts=24,
backoff=5,
)
log.info(f"Filled pageserver {ps.id} in {time.time() - t1}s")
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
# would be for original scheduling decisions to always match optimizer's preference)
# (workaround for https://github.com/neondatabase/neon/issues/8969)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,

View File

@@ -4,7 +4,7 @@ import enum
import json
import os
import time
from typing import TYPE_CHECKING
from typing import Optional
import pytest
from fixtures.log_helper import log
@@ -16,10 +16,6 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.utils import wait_until
from fixtures.workload import Workload
if TYPE_CHECKING:
from typing import Optional
AGGRESIVE_COMPACTION_TENANT_CONF = {
# Disable gc and compaction. The test runs compaction manually.
"gc_period": "0s",

View File

@@ -9,7 +9,6 @@ from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
@@ -94,34 +93,6 @@ if TYPE_CHECKING:
# # Run forward compatibility test
# ./scripts/pytest -k test_forward_compatibility
#
#
# How to run `test_version_mismatch` locally:
#
# export DEFAULT_PG_VERSION=16
# export BUILD_TYPE=release
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export NEON_BIN=target/release
# export POSTGRES_DISTRIB_DIR=pg_install
#
# # Build previous version of binaries and store them somewhere:
# rm -rf pg_install target
# git checkout <previous version>
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
# mkdir -p neon_previous/target
# cp -a target/${BUILD_TYPE} ./neon_previous/target/${BUILD_TYPE}
# cp -a pg_install ./neon_previous/pg_install
#
# # Build current version of binaries and create a data snapshot:
# rm -rf pg_install target
# git checkout <current version>
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
# ./scripts/pytest -k test_create_snapshot
#
# # Run the version mismatch test
# ./scripts/pytest -k test_version_mismatch
check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
os.environ.get("CHECK_ONDISK_DATA_COMPATIBILITY") is None,
@@ -195,11 +166,16 @@ def test_backward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir: Path,
):
"""
Test that the new binaries can read old data
"""
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), f"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg{pg_version.v_prefixed}` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
breaking_changes_allowed = (
os.environ.get("ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
@@ -238,11 +214,27 @@ def test_forward_compatibility(
test_output_dir: Path,
top_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir: Path,
):
"""
Test that the old binaries can read new data
"""
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
"(ideally generated by the previous version of Neon)"
)
compatibility_neon_bin = Path(compatibility_neon_bin_env).resolve()
compatibility_postgres_distrib_dir_env = os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR")
assert (
compatibility_postgres_distrib_dir_env is not None
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
compatibility_snapshot_dir = (
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
)
breaking_changes_allowed = (
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
@@ -253,14 +245,9 @@ def test_forward_compatibility(
# Use previous version's production binaries (pageserver, safekeeper, pg_distrib_dir, etc.).
# But always use the current version's neon_local binary.
# This is because we want to test the compatibility of the data format, not the compatibility of the neon_local CLI.
assert (
neon_env_builder.compatibility_neon_binpath is not None
), "the environment variable COMPATIBILITY_NEON_BIN is required"
assert (
neon_env_builder.compatibility_pg_distrib_dir is not None
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required"
neon_env_builder.neon_binpath = neon_env_builder.compatibility_neon_binpath
neon_env_builder.pg_distrib_dir = neon_env_builder.compatibility_pg_distrib_dir
neon_env_builder.neon_binpath = compatibility_neon_bin
neon_env_builder.pg_distrib_dir = compatibility_postgres_distrib_dir
neon_env_builder.neon_local_binpath = neon_env_builder.neon_local_binpath
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",
@@ -571,29 +558,3 @@ def test_historic_storage_formats(
env.pageserver.http_client().timeline_compact(
dataset.tenant_id, existing_timeline_id, force_image_layer_creation=True
)
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",
)
env.pageserver.allowed_errors.extend(
[".*ingesting record with timestamp lagging more than wait_lsn_timeout.+"]
)
env.start()
check_neon_works(
env, test_output_dir, compatibility_snapshot_dir / "dump.sql", test_output_dir / "repo"
)

View File

@@ -162,11 +162,6 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID)
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1)
# We will stop the storage controller while it may have requests in
# flight, and the pageserver complains when requests are abandoned.
for ps in env.pageservers:
ps.allowed_errors.append(".*request was dropped before completing.*")
# Keep NeonEnv state up to date, it usually owns starting/stopping services
env.pageservers[0].running = False
env.pageservers[1].running = False

View File

@@ -15,7 +15,7 @@ import enum
import os
import re
import time
from typing import TYPE_CHECKING
from typing import Optional
import pytest
from fixtures.common_types import TenantId, TimelineId
@@ -40,10 +40,6 @@ from fixtures.remote_storage import (
from fixtures.utils import wait_until
from fixtures.workload import Workload
if TYPE_CHECKING:
from typing import Optional
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
TENANT_CONF = {

View File

@@ -23,7 +23,6 @@ from fixtures.remote_storage import s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
from pytest_httpserver import HTTPServer
from typing_extensions import override
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -955,7 +954,6 @@ class PageserverFailpoint(Failure):
self.pageserver_id = pageserver_id
self._mitigate = mitigate
@override
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.allowed_errors.extend(
@@ -963,23 +961,19 @@ class PageserverFailpoint(Failure):
)
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
@override
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
if self._mitigate:
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Active"})
@override
def expect_available(self):
return True
@override
def can_mitigate(self):
return self._mitigate
@override
def mitigate(self, env: NeonEnv):
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
@@ -989,11 +983,9 @@ class StorageControllerFailpoint(Failure):
self.pageserver_id = None
self.action = action
@override
def apply(self, env: NeonEnv):
env.storage_controller.configure_failpoints((self.failpoint, self.action))
@override
def clear(self, env: NeonEnv):
if "panic" in self.action:
log.info("Restarting storage controller after panic")
@@ -1002,19 +994,16 @@ class StorageControllerFailpoint(Failure):
else:
env.storage_controller.configure_failpoints((self.failpoint, "off"))
@override
def expect_available(self):
# Controller panics _do_ leave pageservers available, but our test code relies
# on using the locate API to update configurations in Workload, so we must skip
# these actions when the controller has been panicked.
return "panic" not in self.action
@override
def can_mitigate(self):
return False
@override
def fails_forward(self, env: NeonEnv):
def fails_forward(self, env):
# Edge case: the very last failpoint that simulates a DB connection error, where
# the abort path will fail-forward and result in a complete split.
fail_forward = self.failpoint == "shard-split-post-complete"
@@ -1028,7 +1017,6 @@ class StorageControllerFailpoint(Failure):
return fail_forward
@override
def expect_exception(self):
if "panic" in self.action:
return requests.exceptions.ConnectionError
@@ -1041,22 +1029,18 @@ class NodeKill(Failure):
self.pageserver_id = pageserver_id
self._mitigate = mitigate
@override
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=True)
@override
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
@override
def expect_available(self):
return False
@override
def mitigate(self, env: NeonEnv):
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
@@ -1075,26 +1059,21 @@ class CompositeFailure(Failure):
self.pageserver_id = f.pageserver_id
break
@override
def apply(self, env: NeonEnv):
for f in self.failures:
f.apply(env)
@override
def clear(self, env: NeonEnv):
def clear(self, env):
for f in self.failures:
f.clear(env)
@override
def expect_available(self):
return all(f.expect_available() for f in self.failures)
@override
def mitigate(self, env: NeonEnv):
def mitigate(self, env):
for f in self.failures:
f.mitigate(env)
@override
def expect_exception(self):
expect = set(f.expect_exception() for f in self.failures)
@@ -1232,7 +1211,7 @@ def test_sharding_split_failures(
assert attached_count == initial_shard_count
def assert_split_done(exclude_ps_id: Optional[int] = None) -> None:
def assert_split_done(exclude_ps_id=None) -> None:
secondary_count = 0
attached_count = 0
for ps in env.pageservers:

View File

@@ -9,7 +9,6 @@ from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING
import fixtures.utils
import pytest
from fixtures.auth_tokens import TokenScope
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -39,11 +38,7 @@ from fixtures.pg_version import PgVersion, run_only_on_default_postgres
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.storage_controller_proxy import StorageControllerProxy
from fixtures.utils import (
run_pg_bench_small,
subprocess_capture,
wait_until,
)
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
from fixtures.workload import Workload
from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
@@ -65,8 +60,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
return counts
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder,
):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -1042,7 +1038,7 @@ def test_storage_controller_tenant_deletion(
)
# Break the compute hook: we are checking that deletion does not depend on the compute hook being available
def break_hook(_body: Any):
def break_hook():
raise RuntimeError("Unexpected call to compute hook")
compute_reconfigure_listener.register_on_notify(break_hook)
@@ -1304,11 +1300,11 @@ def test_storage_controller_heartbeats(
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"Back online: {node_to_tenants=}")
# ... background reconciliation may need to run to clean up the location on the node that was offline
env.storage_controller.reconcile_until_idle()
# ... expecting the storage controller to reach a consistent state
env.storage_controller.consistency_check()
def storage_controller_consistent():
env.storage_controller.consistency_check()
wait_until(30, 1, storage_controller_consistent)
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):

View File

@@ -6,7 +6,7 @@ import shutil
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING
from typing import Optional
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -20,9 +20,6 @@ from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
if TYPE_CHECKING:
from typing import Optional
@pytest.mark.parametrize("shard_count", [None, 4])
def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):

View File

@@ -479,9 +479,9 @@ def assert_size_approx_equal(size_a, size_b):
"""
# Determined empirically from examples of equality failures: they differ
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 6 to avoid
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 4 to avoid
# failing on outliers from that observed range.
threshold = 6 * 8272
threshold = 4 * 8272
assert size_a == pytest.approx(size_b, abs=threshold)

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import concurrent.futures
import os
import threading
import time
from contextlib import closing
from datetime import datetime
@@ -11,7 +10,7 @@ from pathlib import Path
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.common_types import Lsn, TenantId
from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
@@ -477,34 +476,3 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
assert counts
log.info(f"directory counts: {counts}")
assert counts[2] > COUNT_AT_LEAST_EXPECTED
def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
"""
(Relaxed) regression test for issue that led to https://github.com/neondatabase/neon/pull/9268
Create many endpoints in parallel and then restart them
"""
env = neon_simple_env
# This param needs to be 200+ to reproduce the limit issue
n_threads = 16
barrier = threading.Barrier(n_threads)
def test_timeline(branch_name: str, timeline_id: TimelineId):
endpoint = env.endpoints.create_start(branch_name)
endpoint.stop()
# Use a barrier to make sure we restart endpoints at the same time
barrier.wait()
endpoint.start()
workers = []
for i in range(0, n_threads):
branch_name = f"branch_{i}"
timeline_id = env.create_branch(branch_name)
w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id])
workers.append(w)
w.start()
for w in workers:
w.join()

View File

@@ -58,7 +58,6 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] }
prost = { version = "0.13", features = ["prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
@@ -67,7 +66,7 @@ regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["alloc", "raw_value"] }
serde_json = { version = "1", features = ["raw_value"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
@@ -77,7 +76,6 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-sys = { version = "0.5" }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }