mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
2 Commits
jcsp/issue
...
grant-jwt-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6c661ccb6 | ||
|
|
0abf0f6dce |
@@ -183,7 +183,7 @@ runs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Store Allure test stat in the DB (new)
|
||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||
|
||||
@@ -88,7 +88,7 @@ runs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
shell: bash -euxo pipefail {0}
|
||||
@@ -218,9 +218,6 @@ runs:
|
||||
name: compatibility-snapshot-${{ runner.arch }}-${{ inputs.build_type }}-pg${{ inputs.pg_version }}
|
||||
# Directory is created by test_compatibility.py::test_create_snapshot, keep the path in sync with the test
|
||||
path: /tmp/test_output/compatibility_snapshot_pg${{ inputs.pg_version }}/
|
||||
# The lack of compatibility snapshot shouldn't fail the job
|
||||
# (for example if we didn't run the test for non build-and-test workflow)
|
||||
skip-if-does-not-exist: true
|
||||
|
||||
- name: Upload test results
|
||||
if: ${{ !cancelled() }}
|
||||
|
||||
18
.github/actions/upload/action.yml
vendored
18
.github/actions/upload/action.yml
vendored
@@ -7,10 +7,6 @@ inputs:
|
||||
path:
|
||||
description: "A directory or file to upload"
|
||||
required: true
|
||||
skip-if-does-not-exist:
|
||||
description: "Allow to skip if path doesn't exist, fail otherwise"
|
||||
default: false
|
||||
required: false
|
||||
prefix:
|
||||
description: "S3 prefix. Default is '${GITHUB_SHA}/${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
|
||||
required: false
|
||||
@@ -19,12 +15,10 @@ runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Prepare artifact
|
||||
id: prepare-artifact
|
||||
shell: bash -euxo pipefail {0}
|
||||
env:
|
||||
SOURCE: ${{ inputs.path }}
|
||||
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
|
||||
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
|
||||
run: |
|
||||
mkdir -p $(dirname $ARCHIVE)
|
||||
|
||||
@@ -39,22 +33,14 @@ runs:
|
||||
elif [ -f ${SOURCE} ]; then
|
||||
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
|
||||
elif ! ls ${SOURCE} > /dev/null 2>&1; then
|
||||
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
|
||||
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
|
||||
exit 0
|
||||
else
|
||||
echo >&2 "${SOURCE} does not exist"
|
||||
exit 2
|
||||
fi
|
||||
echo >&2 "${SOURCE} does not exist"
|
||||
exit 2
|
||||
else
|
||||
echo >&2 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
|
||||
exit 3
|
||||
fi
|
||||
|
||||
echo 'SKIPPED=false' >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Upload artifact
|
||||
if: ${{ steps.prepare-artifact.outputs.SKIPPED == 'false' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
env:
|
||||
SOURCE: ${{ inputs.path }}
|
||||
|
||||
@@ -124,28 +124,28 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_17
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
|
||||
44
.github/workflows/build-build-tools-image.yml
vendored
44
.github/workflows/build-build-tools-image.yml
vendored
@@ -19,16 +19,9 @@ defaults:
|
||||
run:
|
||||
shell: bash -euo pipefail {0}
|
||||
|
||||
# The initial idea was to prevent the waste of resources by not re-building the `build-tools` image
|
||||
# for the same tag in parallel workflow runs, and queue them to be skipped once we have
|
||||
# the first image pushed to Docker registry, but GitHub's concurrency mechanism is not working as expected.
|
||||
# GitHub can't have more than 1 job in a queue and removes the previous one, it causes failures if the dependent jobs.
|
||||
#
|
||||
# Ref https://github.com/orgs/community/discussions/41518
|
||||
#
|
||||
# concurrency:
|
||||
# group: build-build-tools-image-${{ inputs.image-tag }}
|
||||
# cancel-in-progress: false
|
||||
concurrency:
|
||||
group: build-build-tools-image-${{ inputs.image-tag }}
|
||||
cancel-in-progress: false
|
||||
|
||||
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
|
||||
permissions: {}
|
||||
@@ -43,7 +36,6 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
debian-version: [ bullseye, bookworm ]
|
||||
arch: [ x64, arm64 ]
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
@@ -82,22 +74,22 @@ jobs:
|
||||
|
||||
- uses: docker/build-push-action@v6
|
||||
with:
|
||||
file: Dockerfile.build-tools
|
||||
context: .
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
build-args: |
|
||||
DEBIAN_VERSION=${{ matrix.debian-version }}
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian-version, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
file: Dockerfile.build-tools
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0},mode=max', matrix.arch) || '' }}
|
||||
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
|
||||
|
||||
merge-images:
|
||||
needs: [ build-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
env:
|
||||
IMAGE_TAG: ${{ inputs.image-tag }}
|
||||
|
||||
steps:
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
@@ -105,17 +97,7 @@ jobs:
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
- name: Create multi-arch image
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bullseye
|
||||
IMAGE_TAG: ${{ inputs.image-tag }}
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian_version}")
|
||||
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
tags+=("-t" "neondatabase/build-tools:${IMAGE_TAG}")
|
||||
fi
|
||||
|
||||
docker buildx imagetools create "${tags[@]}" \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-x64 \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-arm64
|
||||
done
|
||||
docker buildx imagetools create -t neondatabase/build-tools:${IMAGE_TAG} \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-x64 \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-arm64
|
||||
|
||||
155
.github/workflows/build_and_test.yml
vendored
155
.github/workflows/build_and_test.yml
vendored
@@ -92,7 +92,7 @@ jobs:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -106,7 +106,7 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
@@ -181,7 +181,7 @@ jobs:
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -193,15 +193,16 @@ jobs:
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Cache cargo deps
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
# Disabled for now
|
||||
# - name: Restore cargo deps cache
|
||||
# id: cache_cargo
|
||||
# uses: actions/cache@v4
|
||||
# with:
|
||||
# path: |
|
||||
# !~/.cargo/registry/src
|
||||
# ~/.cargo/git/
|
||||
# target/
|
||||
# key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-clippy-${{ hashFiles('rust-toolchain.toml') }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
# Some of our rust modules use FFI and need those to be checked
|
||||
- name: Get postgres headers
|
||||
@@ -261,7 +262,7 @@ jobs:
|
||||
uses: ./.github/workflows/_build-and-test-locally.yml
|
||||
with:
|
||||
arch: ${{ matrix.arch }}
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
build-tag: ${{ needs.tag.outputs.build-tag }}
|
||||
build-type: ${{ matrix.build-type }}
|
||||
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
|
||||
@@ -276,7 +277,7 @@ jobs:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -289,7 +290,7 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
@@ -309,7 +310,7 @@ jobs:
|
||||
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -367,7 +368,7 @@ jobs:
|
||||
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -415,7 +416,7 @@ jobs:
|
||||
needs: [ check-permissions, build-build-tools-image, build-and-test-locally ]
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -559,16 +560,15 @@ jobs:
|
||||
ADDITIONAL_RUSTFLAGS=${{ matrix.arch == 'arm64' && '-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1' || '' }}
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
|
||||
DEBIAN_VERSION=bookworm
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile
|
||||
cache-from: type=registry,ref=cache.neon.build/neon:cache-bookworm-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0}-{1},mode=max', 'bookworm', matrix.arch) || '' }}
|
||||
cache-from: type=registry,ref=cache.neon.build/neon:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0},mode=max', matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-${{ matrix.arch }}
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
neon-image:
|
||||
needs: [ neon-image-arch, tag ]
|
||||
@@ -583,9 +583,8 @@ jobs:
|
||||
- name: Create multi-arch image
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/neon:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm \
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-x64 \
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-arm64
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-x64 \
|
||||
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-arm64
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
@@ -606,16 +605,17 @@ jobs:
|
||||
version:
|
||||
# Much data was already generated on old PG versions with bullseye's
|
||||
# libraries, the locales of which can cause data incompatibilities.
|
||||
# However, new PG versions should be build on newer images,
|
||||
# as that reduces the support burden of old and ancient distros.
|
||||
# However, new PG versions should check if they can be built on newer
|
||||
# images, as that reduces the support burden of old and ancient
|
||||
# distros.
|
||||
- pg: v14
|
||||
debian: bullseye
|
||||
debian: bullseye-slim
|
||||
- pg: v15
|
||||
debian: bullseye
|
||||
debian: bullseye-slim
|
||||
- pg: v16
|
||||
debian: bullseye
|
||||
debian: bullseye-slim
|
||||
- pg: v17
|
||||
debian: bookworm
|
||||
debian: bookworm-slim
|
||||
arch: [ x64, arm64 ]
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
@@ -660,16 +660,16 @@ jobs:
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/Dockerfile.compute-node
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
- name: Build neon extensions test image
|
||||
if: matrix.version.pg == 'v16'
|
||||
@@ -680,17 +680,17 @@ jobs:
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/Dockerfile.compute-node
|
||||
target: neon-pg-ext-test
|
||||
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
|
||||
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
|
||||
|
||||
- name: Build compute-tools image
|
||||
# compute-tools are Postgres independent, so build it only once
|
||||
@@ -705,16 +705,14 @@ jobs:
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/Dockerfile.compute-node
|
||||
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-tools-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
compute-node-image:
|
||||
needs: [ compute-node-image-arch, tag ]
|
||||
@@ -722,16 +720,7 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
version:
|
||||
# see the comment for `compute-node-image-arch` job
|
||||
- pg: v14
|
||||
debian: bullseye
|
||||
- pg: v15
|
||||
debian: bullseye
|
||||
- pg: v16
|
||||
debian: bullseye
|
||||
- pg: v17
|
||||
debian: bookworm
|
||||
version: [ v14, v15, v16, v17 ]
|
||||
|
||||
steps:
|
||||
- uses: docker/login-action@v3
|
||||
@@ -741,26 +730,23 @@ jobs:
|
||||
|
||||
- name: Create multi-arch compute-node image
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
|
||||
|
||||
- name: Create multi-arch neon-test-extensions image
|
||||
if: matrix.version.pg == 'v16'
|
||||
if: matrix.version == 'v16'
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
|
||||
|
||||
- name: Create multi-arch compute-tools image
|
||||
if: matrix.version.pg == 'v16'
|
||||
if: matrix.version == 'v17'
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-x64 \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-arm64
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
@@ -768,13 +754,13 @@ jobs:
|
||||
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
|
||||
- name: Push multi-arch compute-node-${{ matrix.version.pg }} image to ECR
|
||||
- name: Push multi-arch compute-node-${{ matrix.version }} image to ECR
|
||||
run: |
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Push multi-arch compute-tools image to ECR
|
||||
if: matrix.version.pg == 'v16'
|
||||
if: matrix.version == 'v17'
|
||||
run: |
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
|
||||
@@ -785,16 +771,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
# see the comment for `compute-node-image-arch` job
|
||||
- pg: v14
|
||||
debian: bullseye
|
||||
- pg: v15
|
||||
debian: bullseye
|
||||
- pg: v16
|
||||
debian: bullseye
|
||||
- pg: v17
|
||||
debian: bookworm
|
||||
version: [ v14, v15, v16, v17 ]
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.35.0
|
||||
|
||||
@@ -816,18 +793,18 @@ jobs:
|
||||
# it won't have the proper authentication (written at v0.6.0)
|
||||
- name: Pulling compute-node image
|
||||
run: |
|
||||
docker pull neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
docker pull neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Build vm image
|
||||
run: |
|
||||
./vm-builder \
|
||||
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
|
||||
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
-spec=compute/vm-image-spec.yaml \
|
||||
-src=neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
|
||||
-dst=neondatabase/vm-compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Pushing vm-compute-node image
|
||||
run: |
|
||||
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
docker push neondatabase/vm-compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
test-images:
|
||||
needs: [ check-permissions, tag, neon-image, compute-node-image ]
|
||||
|
||||
2
.github/workflows/neon_extra_builds.yml
vendored
2
.github/workflows/neon_extra_builds.yml
vendored
@@ -155,7 +155,7 @@ jobs:
|
||||
github.ref_name == 'main'
|
||||
runs-on: [ self-hosted, large ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
4
.github/workflows/pg-clients.yml
vendored
4
.github/workflows/pg-clients.yml
vendored
@@ -55,7 +55,7 @@ jobs:
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -150,7 +150,7 @@ jobs:
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
23
.github/workflows/pin-build-tools-image.yml
vendored
23
.github/workflows/pin-build-tools-image.yml
vendored
@@ -71,6 +71,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- uses: docker/login-action@v3
|
||||
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -93,22 +94,8 @@ jobs:
|
||||
az acr login --name=neoneastus2
|
||||
|
||||
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bullseye
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=()
|
||||
|
||||
tags+=("-t" "neondatabase/build-tools:${TO_TAG}-${debian_version}")
|
||||
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}-${debian_version}")
|
||||
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}-${debian_version}")
|
||||
|
||||
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
tags+=("-t" "neondatabase/build-tools:${TO_TAG}")
|
||||
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}")
|
||||
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}")
|
||||
fi
|
||||
|
||||
docker buildx imagetools create "${tags[@]}" \
|
||||
neondatabase/build-tools:${FROM_TAG}-${debian_version}
|
||||
done
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG} \
|
||||
-t neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG} \
|
||||
-t neondatabase/build-tools:${TO_TAG} \
|
||||
neondatabase/build-tools:${FROM_TAG}
|
||||
|
||||
2
.github/workflows/report-workflow-stats.yml
vendored
2
.github/workflows/report-workflow-stats.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
- name: Export GH Workflow Stats
|
||||
uses: neondatabase/gh-workflow-stats-action@v0.1.4
|
||||
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
|
||||
with:
|
||||
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
DB_TABLE: "gh_workflow_stats_neon"
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
||||
/storage_controller @neondatabase/storage
|
||||
/storage_scrubber @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
|
||||
@@ -7,8 +7,6 @@ ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG DEFAULT_PG_VERSION=17
|
||||
ARG STABLE_PG_VERSION=16
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
|
||||
# Build Postgres
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS pg-build
|
||||
@@ -59,7 +57,7 @@ RUN set -e \
|
||||
|
||||
# Build final image
|
||||
#
|
||||
FROM debian:${DEBIAN_FLAVOR}
|
||||
FROM debian:bullseye-slim
|
||||
ARG DEFAULT_PG_VERSION
|
||||
WORKDIR /data
|
||||
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
FROM debian:bullseye-slim
|
||||
|
||||
FROM debian:${DEBIAN_VERSION}-slim
|
||||
ARG DEBIAN_VERSION
|
||||
# Use ARG as a build-time environment variable here to allow.
|
||||
# It's not supposed to be set outside.
|
||||
# Alternatively it can be obtained using the following command
|
||||
# ```
|
||||
# . /etc/os-release && echo "${VERSION_CODENAME}"
|
||||
# ```
|
||||
ARG DEBIAN_VERSION_CODENAME=bullseye
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
@@ -37,14 +42,14 @@ RUN set -e \
|
||||
libseccomp-dev \
|
||||
libsqlite3-dev \
|
||||
libssl-dev \
|
||||
$([[ "${DEBIAN_VERSION}" = "bullseye" ]] && libstdc++-10-dev || libstdc++-11-dev) \
|
||||
libstdc++-10-dev \
|
||||
libtool \
|
||||
libxml2-dev \
|
||||
libxmlsec1-dev \
|
||||
libxxhash-dev \
|
||||
lsof \
|
||||
make \
|
||||
netcat-openbsd \
|
||||
netcat \
|
||||
net-tools \
|
||||
openssh-client \
|
||||
parallel \
|
||||
@@ -73,7 +78,7 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
|
||||
# LLVM
|
||||
ENV LLVM_VERSION=18
|
||||
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
|
||||
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
|
||||
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION_CODENAME}/ llvm-toolchain-${DEBIAN_VERSION_CODENAME}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
|
||||
&& apt update \
|
||||
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
|
||||
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
|
||||
@@ -81,7 +86,7 @@ RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
|
||||
|
||||
# Install docker
|
||||
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
|
||||
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
|
||||
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION_CODENAME} stable" > /etc/apt/sources.list.d/docker.list \
|
||||
&& apt update \
|
||||
&& apt install -y docker-ce docker-ce-cli \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
|
||||
|
||||
@@ -3,8 +3,7 @@ ARG REPOSITORY=neondatabase
|
||||
ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG BUILD_TAG
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
ARG DEBIAN_FLAVOR=bullseye-slim
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -12,23 +11,20 @@ ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
#
|
||||
#########################################################################################
|
||||
FROM debian:$DEBIAN_FLAVOR AS build-deps
|
||||
ARG DEBIAN_VERSION
|
||||
ARG DEBIAN_FLAVOR
|
||||
|
||||
RUN case $DEBIAN_VERSION in \
|
||||
RUN case $DEBIAN_FLAVOR in \
|
||||
# Version-specific installs for Bullseye (PG14-PG16):
|
||||
# The h3_pg extension needs a cmake 3.20+, but Debian bullseye has 3.18.
|
||||
# Install newer version (3.25) from backports.
|
||||
bullseye) \
|
||||
bullseye*) \
|
||||
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
|
||||
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports"; \
|
||||
;; \
|
||||
# Version-specific installs for Bookworm (PG17):
|
||||
bookworm) \
|
||||
bookworm*) \
|
||||
VERSION_INSTALLS="cmake"; \
|
||||
;; \
|
||||
*) \
|
||||
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
apt update && \
|
||||
apt install --no-install-recommends -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
@@ -1095,6 +1091,7 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de
|
||||
#########################################################################################
|
||||
|
||||
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
|
||||
ARG DEBIAN_FLAVOR
|
||||
|
||||
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
@@ -1105,6 +1102,7 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compu
|
||||
#########################################################################################
|
||||
|
||||
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
|
||||
ARG DEBIAN_FLAVOR
|
||||
RUN set -e \
|
||||
&& apt-get update \
|
||||
&& apt-get install --no-install-recommends -y \
|
||||
@@ -1259,7 +1257,7 @@ ENV PGDATABASE=postgres
|
||||
#
|
||||
#########################################################################################
|
||||
FROM debian:$DEBIAN_FLAVOR
|
||||
ARG DEBIAN_VERSION
|
||||
ARG DEBIAN_FLAVOR
|
||||
# Add user postgres
|
||||
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
echo "postgres:test_console_pass" | chpasswd && \
|
||||
@@ -1307,22 +1305,19 @@ RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/loca
|
||||
|
||||
|
||||
RUN apt update && \
|
||||
case $DEBIAN_VERSION in \
|
||||
case $DEBIAN_FLAVOR in \
|
||||
# Version-specific installs for Bullseye (PG14-PG16):
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
# libgdal28, libproj19 for PostGIS
|
||||
bullseye) \
|
||||
bullseye*) \
|
||||
VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \
|
||||
;; \
|
||||
# Version-specific installs for Bookworm (PG17):
|
||||
# libicu72, locales for collations (including ICU and plpgsql_check)
|
||||
# libgdal32, libproj25 for PostGIS
|
||||
bookworm) \
|
||||
bookworm*) \
|
||||
VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \
|
||||
;; \
|
||||
*) \
|
||||
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
apt install --no-install-recommends -y \
|
||||
gdb \
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
|
||||
---
|
||||
commands:
|
||||
- name: cgconfigparser
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
|
||||
# restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for
|
||||
# running it as root.
|
||||
- name: chmod-resize-swap
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/resize-swap'
|
||||
- name: chmod-set-disk-quota
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
|
||||
- name: local_proxy
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml -web.listen-address=:9399'
|
||||
- name: sql-exporter-autoscaling
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
files:
|
||||
- filename: compute_ctl-sudoers
|
||||
content: |
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
group neon-postgres {
|
||||
perm {
|
||||
admin {
|
||||
uid = postgres;
|
||||
}
|
||||
task {
|
||||
gid = users;
|
||||
}
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
|
||||
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
|
||||
# requires cgroup v2, so we'll build cgroup-tools ourselves.
|
||||
#
|
||||
# At time of migration to bookworm (2024-10-09), debian has a version of libcgroup/cgroup-tools 2.0.2,
|
||||
# and it _probably_ can be used as-is. However, we'll build it ourselves to minimise the changeset
|
||||
# for debian version migration.
|
||||
#
|
||||
FROM debian:bookworm-slim as libcgroup-builder
|
||||
ENV LIBCGROUP_VERSION=v2.0.3
|
||||
|
||||
RUN set -exu \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
git \
|
||||
ca-certificates \
|
||||
automake \
|
||||
cmake \
|
||||
make \
|
||||
gcc \
|
||||
byacc \
|
||||
flex \
|
||||
libtool \
|
||||
libpam0g-dev \
|
||||
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
|
||||
&& INSTALL_DIR="/libcgroup-install" \
|
||||
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
|
||||
&& cd libcgroup \
|
||||
# extracted from bootstrap.sh, with modified flags:
|
||||
&& (test -d m4 || mkdir m4) \
|
||||
&& autoreconf -fi \
|
||||
&& rm -rf autom4te.cache \
|
||||
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
|
||||
# actually build the thing...
|
||||
&& make install
|
||||
merge: |
|
||||
# tweak nofile limits
|
||||
RUN set -e \
|
||||
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
|
||||
&& test ! -e /etc/security || ( \
|
||||
echo '* - nofile 1048576' >>/etc/security/limits.conf \
|
||||
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
|
||||
)
|
||||
|
||||
# Allow postgres user (compute_ctl) to run swap resizer.
|
||||
# Need to install sudo in order to allow this.
|
||||
#
|
||||
# Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe.
|
||||
RUN set -e \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
sudo \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
|
||||
COPY compute_ctl-sudoers /etc/sudoers.d/compute_ctl-sudoers
|
||||
|
||||
COPY cgconfig.conf /etc/cgconfig.conf
|
||||
|
||||
RUN set -e \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||
@@ -318,12 +318,26 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
// If the role we're creating is intended for JWT login, we do
|
||||
// not give it any attributes.
|
||||
if jwks_roles.contains(name.as_str()) {
|
||||
query = format!("CREATE ROLE {}", name.pg_quote());
|
||||
}
|
||||
info!("running role create query: '{}'", &query);
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
// If the role we're creating is intended for JWT login, we have
|
||||
// to make sure it can execute functions in the auth schema.
|
||||
if jwks_roles.contains(name.as_str()) {
|
||||
let mut grant_query = format!(
|
||||
"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA auth TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("running grant query for JWT role: '{}'", &grant_query);
|
||||
grant_query.push_str(&role.to_pg_options());
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -97,21 +97,7 @@ impl ComputeControlPlane {
|
||||
for endpoint_dir in std::fs::read_dir(env.endpoints_path())
|
||||
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
|
||||
{
|
||||
let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
|
||||
let ep = match ep_res {
|
||||
Ok(ep) => ep,
|
||||
Err(e) => match e.downcast::<std::io::Error>() {
|
||||
Ok(e) => {
|
||||
// A parallel task could delete an endpoint while we have just scanned the directory
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
continue;
|
||||
} else {
|
||||
Err(e)?
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
},
|
||||
};
|
||||
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
|
||||
endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
|
||||
}
|
||||
|
||||
|
||||
@@ -31,12 +31,9 @@ pub enum Scope {
|
||||
/// The scope used by pageservers in upcalls to storage controller and cloud control plane
|
||||
#[serde(rename = "generations_api")]
|
||||
GenerationsApi,
|
||||
/// Allows access to control plane managment API and all storage controller endpoints.
|
||||
/// Allows access to control plane managment API and some storage controller endpoints.
|
||||
Admin,
|
||||
|
||||
/// Allows access to control plane & storage controller endpoints used in infrastructure automation (e.g. node registration)
|
||||
Infra,
|
||||
|
||||
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state
|
||||
/// of a tenant & post scrub results.
|
||||
Scrubber,
|
||||
|
||||
@@ -28,9 +28,6 @@ pub enum ApiError {
|
||||
#[error("Resource temporarily unavailable: {0}")]
|
||||
ResourceUnavailable(Cow<'static, str>),
|
||||
|
||||
#[error("Too many requests: {0}")]
|
||||
TooManyRequests(Cow<'static, str>),
|
||||
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
@@ -76,10 +73,6 @@ impl ApiError {
|
||||
err.to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
),
|
||||
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::REQUEST_TIMEOUT,
|
||||
|
||||
@@ -14,19 +14,14 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
|
||||
}
|
||||
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
|
||||
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
|
||||
(
|
||||
Scope::Admin
|
||||
| Scope::SafekeeperData
|
||||
| Scope::GenerationsApi
|
||||
| Scope::Infra
|
||||
| Scope::Scrubber,
|
||||
_,
|
||||
) => Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Pageserver auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
)),
|
||||
(Scope::Admin | Scope::SafekeeperData | Scope::GenerationsApi | Scope::Scrubber, _) => {
|
||||
Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Pageserver auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -715,8 +715,6 @@ async fn timeline_archival_config_handler(
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
tenant
|
||||
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
|
||||
.await?;
|
||||
|
||||
@@ -493,8 +493,6 @@ pub struct OffloadedTimeline {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
/// Whether to retain the branch lsn at the ancestor or not
|
||||
pub ancestor_retain_lsn: Option<Lsn>,
|
||||
|
||||
// TODO: once we persist offloaded state, make this lazily constructed
|
||||
pub remote_client: Arc<RemoteTimelineClient>,
|
||||
@@ -506,14 +504,10 @@ pub struct OffloadedTimeline {
|
||||
|
||||
impl OffloadedTimeline {
|
||||
fn from_timeline(timeline: &Timeline) -> Self {
|
||||
let ancestor_retain_lsn = timeline
|
||||
.get_ancestor_timeline_id()
|
||||
.map(|_timeline_id| timeline.get_ancestor_lsn());
|
||||
Self {
|
||||
tenant_shard_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_retain_lsn,
|
||||
|
||||
remote_client: timeline.remote_client.clone(),
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
@@ -521,12 +515,6 @@ impl OffloadedTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum MaybeOffloaded {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum TimelineOrOffloaded {
|
||||
Timeline(Arc<Timeline>),
|
||||
@@ -2265,13 +2253,12 @@ impl Tenant {
|
||||
|
||||
if activating {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
|
||||
let timelines_to_activate = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor);
|
||||
self.initialize_gc_info(&timelines_accessor);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
@@ -3311,7 +3298,6 @@ impl Tenant {
|
||||
fn initialize_gc_info(
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines_offloaded: &std::sync::MutexGuard<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
|
||||
) {
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
@@ -3319,37 +3305,20 @@ impl Tenant {
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, MaybeOffloaded)>> =
|
||||
BTreeMap::new();
|
||||
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId)>> = BTreeMap::new();
|
||||
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
|
||||
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
|
||||
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
|
||||
ancestor_children.push((
|
||||
timeline_entry.get_ancestor_lsn(),
|
||||
*timeline_id,
|
||||
MaybeOffloaded::No,
|
||||
));
|
||||
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id));
|
||||
}
|
||||
});
|
||||
timelines_offloaded
|
||||
.iter()
|
||||
.for_each(|(timeline_id, timeline_entry)| {
|
||||
let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id else {
|
||||
return;
|
||||
};
|
||||
let Some(retain_lsn) = timeline_entry.ancestor_retain_lsn else {
|
||||
return;
|
||||
};
|
||||
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
|
||||
ancestor_children.push((retain_lsn, *timeline_id, MaybeOffloaded::Yes));
|
||||
});
|
||||
|
||||
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
|
||||
let horizon = self.get_gc_horizon();
|
||||
|
||||
// Populate each timeline's GcInfo with information about its child branches
|
||||
for timeline in timelines.values() {
|
||||
let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints
|
||||
let mut branchpoints: Vec<(Lsn, TimelineId)> = all_branchpoints
|
||||
.remove(&timeline.timeline_id)
|
||||
.unwrap_or_default();
|
||||
|
||||
@@ -4909,10 +4878,7 @@ mod tests {
|
||||
{
|
||||
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
|
||||
assert_eq!(branchpoints.len(), 1);
|
||||
assert_eq!(
|
||||
branchpoints[0],
|
||||
(Lsn(0x40), NEW_TIMELINE_ID, MaybeOffloaded::No)
|
||||
);
|
||||
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID));
|
||||
}
|
||||
|
||||
// You can read the key from the child branch even though the parent is
|
||||
@@ -8295,8 +8261,8 @@ mod tests {
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![
|
||||
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
(Lsn(0x10), tline.timeline_id),
|
||||
(Lsn(0x20), tline.timeline_id),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
@@ -8523,8 +8489,8 @@ mod tests {
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![
|
||||
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
(Lsn(0x10), tline.timeline_id),
|
||||
(Lsn(0x20), tline.timeline_id),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
@@ -8757,7 +8723,7 @@ mod tests {
|
||||
// Update GC info
|
||||
let mut guard = parent_tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
|
||||
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x10),
|
||||
space: Lsn(0x10),
|
||||
@@ -8771,7 +8737,7 @@ mod tests {
|
||||
// Update GC info
|
||||
let mut guard = branch_tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
|
||||
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x50),
|
||||
space: Lsn(0x50),
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
|
||||
use super::{GcError, LogicalSizeCalculationCause, Tenant};
|
||||
use crate::tenant::{MaybeOffloaded, Timeline};
|
||||
use crate::tenant::Timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -264,12 +264,10 @@ pub(super) async fn gather_inputs(
|
||||
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
|
||||
.retain_lsns
|
||||
.iter()
|
||||
.filter(|(lsn, _child_id, is_offloaded)| {
|
||||
lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
|
||||
})
|
||||
.filter(|(lsn, _child_id)| lsn > &ancestor_lsn)
|
||||
.copied()
|
||||
// this assumes there are no other retain_lsns than the branchpoints
|
||||
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
|
||||
.map(|(lsn, _child_id)| (lsn, LsnKind::BranchPoint))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
|
||||
@@ -392,10 +392,6 @@ impl InMemoryLayer {
|
||||
self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
|
||||
}
|
||||
|
||||
pub(crate) fn start_lsn(&self) -> Lsn {
|
||||
self.start_lsn
|
||||
}
|
||||
|
||||
pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.start_lsn..self.end_lsn_or_max()
|
||||
}
|
||||
|
||||
@@ -139,10 +139,8 @@ use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{
|
||||
config::TenantConf,
|
||||
storage_layer::{inmemory_layer, LayerVisibilityHint},
|
||||
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
|
||||
upload_queue::NotInitialized,
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
@@ -452,7 +450,7 @@ pub(crate) struct GcInfo {
|
||||
/// Currently, this includes all points where child branches have
|
||||
/// been forked off from. In the future, could also include
|
||||
/// explicit user-defined snapshot points.
|
||||
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, MaybeOffloaded)>,
|
||||
pub(crate) retain_lsns: Vec<(Lsn, TimelineId)>,
|
||||
|
||||
/// The cutoff coordinates, which are combined by selecting the minimum.
|
||||
pub(crate) cutoffs: GcCutoffs,
|
||||
@@ -469,13 +467,8 @@ impl GcInfo {
|
||||
self.cutoffs.select_min()
|
||||
}
|
||||
|
||||
pub(super) fn insert_child(
|
||||
&mut self,
|
||||
child_id: TimelineId,
|
||||
child_lsn: Lsn,
|
||||
is_offloaded: MaybeOffloaded,
|
||||
) {
|
||||
self.retain_lsns.push((child_lsn, child_id, is_offloaded));
|
||||
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
|
||||
self.retain_lsns.push((child_lsn, child_id));
|
||||
self.retain_lsns.sort_by_key(|i| i.0);
|
||||
}
|
||||
|
||||
@@ -2171,9 +2164,7 @@ impl Timeline {
|
||||
|
||||
if let Some(ancestor) = &ancestor {
|
||||
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
|
||||
// If we construct an explicit timeline object, it's obviously not offloaded
|
||||
let is_offloaded = MaybeOffloaded::No;
|
||||
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
|
||||
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn());
|
||||
}
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
@@ -4884,7 +4875,7 @@ impl Timeline {
|
||||
let retain_lsns = gc_info
|
||||
.retain_lsns
|
||||
.iter()
|
||||
.map(|(lsn, _child_id, _is_offloaded)| *lsn)
|
||||
.map(|(lsn, _child_id)| *lsn)
|
||||
.collect();
|
||||
|
||||
// Gets the maximum LSN that holds the valid lease.
|
||||
|
||||
@@ -42,7 +42,7 @@ use crate::tenant::storage_layer::{
|
||||
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
||||
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::tenant::DeltaLayer;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use pageserver_api::config::tenant_conf_defaults::{
|
||||
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
||||
@@ -639,28 +639,10 @@ impl Timeline {
|
||||
let children = self.gc_info.read().unwrap().retain_lsns.clone();
|
||||
|
||||
let mut readable_points = Vec::with_capacity(children.len() + 1);
|
||||
for (child_lsn, _child_timeline_id, is_offloaded) in &children {
|
||||
if *is_offloaded == MaybeOffloaded::Yes {
|
||||
continue;
|
||||
}
|
||||
for (child_lsn, _child_timeline_id) in &children {
|
||||
readable_points.push(*child_lsn);
|
||||
}
|
||||
readable_points.push(head_lsn);
|
||||
|
||||
// The Timeline get page process will walk all InMemoryLayers before it starts walking historic
|
||||
// layers. That means it might fail to see image layers that overlap with the LSN range of
|
||||
// InMemoryLayers, so there is a de-facto read point at the start_lsn of the oldest InMemoryLayer.
|
||||
//
|
||||
// This behavior in the getpage path is considered a but, and including InMemoryLayer's start_lsn here
|
||||
// is a workaround. See https://github.com/neondatabase/neon/issues/9185
|
||||
if let Some(oldest_inmemory_layer) = layer_map.frozen_layers.front() {
|
||||
readable_points.push(oldest_inmemory_layer.start_lsn())
|
||||
} else if let Some(open_layer) = layer_map.open_layer.as_ref() {
|
||||
readable_points.push(open_layer.start_lsn());
|
||||
}
|
||||
|
||||
readable_points.sort();
|
||||
|
||||
readable_points
|
||||
};
|
||||
|
||||
@@ -1759,7 +1741,7 @@ impl Timeline {
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let mut retain_lsns_below_horizon = Vec::new();
|
||||
let gc_cutoff = gc_info.cutoffs.select_min();
|
||||
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
|
||||
for (lsn, _timeline_id) in &gc_info.retain_lsns {
|
||||
if lsn < &gc_cutoff {
|
||||
retain_lsns_below_horizon.push(*lsn);
|
||||
}
|
||||
|
||||
@@ -43,7 +43,6 @@
|
||||
#include "hll.h"
|
||||
#include "bitmap.h"
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
@@ -115,9 +114,7 @@ typedef struct FileCacheControl
|
||||
uint32 limit; /* shared copy of lfc_size_limit */
|
||||
uint64 hits;
|
||||
uint64 misses;
|
||||
uint64 writes; /* number of writes issued */
|
||||
uint64 time_read; /* time spent reading (us) */
|
||||
uint64 time_write; /* time spent writing (us) */
|
||||
uint64 writes;
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
dlist_head holes; /* double linked list of punched holes */
|
||||
@@ -273,8 +270,6 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->hits = 0;
|
||||
lfc_ctl->misses = 0;
|
||||
lfc_ctl->writes = 0;
|
||||
lfc_ctl->time_read = 0;
|
||||
lfc_ctl->time_write = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
dlist_init(&lfc_ctl->holes);
|
||||
|
||||
@@ -706,7 +701,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
|
||||
int iteration_hits = 0;
|
||||
int iteration_misses = 0;
|
||||
uint64 io_time_us = 0;
|
||||
Assert(blocks_in_chunk > 0);
|
||||
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
@@ -801,13 +795,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
lfc_ctl->misses += iteration_misses;
|
||||
pgBufferUsage.file_cache.hits += iteration_hits;
|
||||
pgBufferUsage.file_cache.misses += iteration_misses;
|
||||
|
||||
if (iteration_hits)
|
||||
{
|
||||
lfc_ctl->time_read += io_time_us;
|
||||
inc_page_cache_read_wait(io_time_us);
|
||||
}
|
||||
|
||||
CriticalAssert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
|
||||
@@ -872,7 +859,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
struct iovec iov[PG_IOV_MAX];
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
|
||||
instr_time io_start, io_end;
|
||||
Assert(blocks_in_chunk > 0);
|
||||
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
@@ -961,13 +947,12 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
lfc_ctl->writes += blocks_in_chunk;
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != BLCKSZ * blocks_in_chunk)
|
||||
@@ -980,17 +965,9 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
uint64 time_spent_us;
|
||||
CriticalAssert(LFC_ENABLED());
|
||||
/* Place entry to the head of LRU list */
|
||||
CriticalAssert(entry->access_count > 0);
|
||||
|
||||
lfc_ctl->writes += blocks_in_chunk;
|
||||
INSTR_TIME_SUBTRACT(io_start, io_end);
|
||||
time_spent_us = INSTR_TIME_GET_MICROSEC(io_start);
|
||||
lfc_ctl->time_write += time_spent_us;
|
||||
inc_page_cache_write_wait(time_spent_us);
|
||||
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
|
||||
|
||||
|
||||
@@ -50,52 +50,28 @@ NeonPerfCountersShmemInit(void)
|
||||
}
|
||||
}
|
||||
|
||||
static inline void
|
||||
inc_iohist(IOHistogram hist, uint64 latency_us)
|
||||
/*
|
||||
* Count a GetPage wait operation.
|
||||
*/
|
||||
void
|
||||
inc_getpage_wait(uint64 latency_us)
|
||||
{
|
||||
int lo = 0;
|
||||
int hi = NUM_IO_WAIT_BUCKETS - 1;
|
||||
int hi = NUM_GETPAGE_WAIT_BUCKETS - 1;
|
||||
|
||||
/* Find the right bucket with binary search */
|
||||
while (lo < hi)
|
||||
{
|
||||
int mid = (lo + hi) / 2;
|
||||
|
||||
if (latency_us < io_wait_bucket_thresholds[mid])
|
||||
if (latency_us < getpage_wait_bucket_thresholds[mid])
|
||||
hi = mid;
|
||||
else
|
||||
lo = mid + 1;
|
||||
}
|
||||
hist->wait_us_bucket[lo]++;
|
||||
hist->wait_us_sum += latency_us;
|
||||
hist->wait_us_count++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Count a GetPage wait operation.
|
||||
*/
|
||||
void
|
||||
inc_getpage_wait(uint64 latency)
|
||||
{
|
||||
inc_iohist(&MyNeonCounters->getpage_hist, latency);
|
||||
}
|
||||
|
||||
/*
|
||||
* Count an LFC read wait operation.
|
||||
*/
|
||||
void
|
||||
inc_page_cache_read_wait(uint64 latency)
|
||||
{
|
||||
inc_iohist(&MyNeonCounters->file_cache_read_hist, latency);
|
||||
}
|
||||
|
||||
/*
|
||||
* Count an LFC write wait operation.
|
||||
*/
|
||||
void
|
||||
inc_page_cache_write_wait(uint64 latency)
|
||||
{
|
||||
inc_iohist(&MyNeonCounters->file_cache_write_hist, latency);
|
||||
MyNeonCounters->getpage_wait_us_bucket[lo]++;
|
||||
MyNeonCounters->getpage_wait_us_sum += latency_us;
|
||||
MyNeonCounters->getpage_wait_us_count++;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -105,91 +81,77 @@ inc_page_cache_write_wait(uint64 latency)
|
||||
|
||||
typedef struct
|
||||
{
|
||||
const char *name;
|
||||
char *name;
|
||||
bool is_bucket;
|
||||
double bucket_le;
|
||||
double value;
|
||||
} metric_t;
|
||||
|
||||
static int
|
||||
histogram_to_metrics(IOHistogram histogram,
|
||||
metric_t *metrics,
|
||||
const char *count,
|
||||
const char *sum,
|
||||
const char *bucket)
|
||||
static metric_t *
|
||||
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
|
||||
{
|
||||
int i = 0;
|
||||
uint64 bucket_accum = 0;
|
||||
#define NUM_METRICS (2 + NUM_GETPAGE_WAIT_BUCKETS + 8)
|
||||
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
|
||||
uint64 bucket_accum;
|
||||
int i = 0;
|
||||
|
||||
metrics[i].name = count;
|
||||
metrics[i].name = "getpage_wait_seconds_count";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) histogram->wait_us_count;
|
||||
metrics[i].value = (double) counters->getpage_wait_us_count;
|
||||
i++;
|
||||
metrics[i].name = sum;
|
||||
metrics[i].name = "getpage_wait_seconds_sum";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) histogram->wait_us_sum / 1000000.0;
|
||||
metrics[i].value = ((double) counters->getpage_wait_us_sum) / 1000000.0;
|
||||
i++;
|
||||
for (int bucketno = 0; bucketno < NUM_IO_WAIT_BUCKETS; bucketno++)
|
||||
|
||||
bucket_accum = 0;
|
||||
for (int bucketno = 0; bucketno < NUM_GETPAGE_WAIT_BUCKETS; bucketno++)
|
||||
{
|
||||
uint64 threshold = io_wait_bucket_thresholds[bucketno];
|
||||
uint64 threshold = getpage_wait_bucket_thresholds[bucketno];
|
||||
|
||||
bucket_accum += histogram->wait_us_bucket[bucketno];
|
||||
bucket_accum += counters->getpage_wait_us_bucket[bucketno];
|
||||
|
||||
metrics[i].name = bucket;
|
||||
metrics[i].name = "getpage_wait_seconds_bucket";
|
||||
metrics[i].is_bucket = true;
|
||||
metrics[i].bucket_le = (threshold == UINT64_MAX) ? INFINITY : ((double) threshold) / 1000000.0;
|
||||
metrics[i].value = (double) bucket_accum;
|
||||
i++;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
static metric_t *
|
||||
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
|
||||
{
|
||||
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + 10)
|
||||
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
|
||||
int i = 0;
|
||||
|
||||
#define APPEND_METRIC(_name) do { \
|
||||
metrics[i].name = #_name; \
|
||||
metrics[i].is_bucket = false; \
|
||||
metrics[i].value = (double) counters->_name; \
|
||||
i++; \
|
||||
} while (false)
|
||||
|
||||
i += histogram_to_metrics(&counters->getpage_hist, &metrics[i],
|
||||
"getpage_wait_seconds_count",
|
||||
"getpage_wait_seconds_sum",
|
||||
"getpage_wait_seconds_bucket");
|
||||
|
||||
APPEND_METRIC(getpage_prefetch_requests_total);
|
||||
APPEND_METRIC(getpage_sync_requests_total);
|
||||
APPEND_METRIC(getpage_prefetch_misses_total);
|
||||
APPEND_METRIC(getpage_prefetch_discards_total);
|
||||
APPEND_METRIC(pageserver_requests_sent_total);
|
||||
APPEND_METRIC(pageserver_disconnects_total);
|
||||
APPEND_METRIC(pageserver_send_flushes_total);
|
||||
APPEND_METRIC(pageserver_open_requests);
|
||||
APPEND_METRIC(getpage_prefetches_buffered);
|
||||
|
||||
APPEND_METRIC(file_cache_hits_total);
|
||||
|
||||
i += histogram_to_metrics(&counters->file_cache_read_hist, &metrics[i],
|
||||
"file_cache_read_wait_seconds_count",
|
||||
"file_cache_read_wait_seconds_sum",
|
||||
"file_cache_read_wait_seconds_bucket");
|
||||
i += histogram_to_metrics(&counters->file_cache_write_hist, &metrics[i],
|
||||
"file_cache_write_wait_seconds_count",
|
||||
"file_cache_write_wait_seconds_sum",
|
||||
"file_cache_write_wait_seconds_bucket");
|
||||
metrics[i].name = "getpage_prefetch_requests_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->getpage_prefetch_requests_total;
|
||||
i++;
|
||||
metrics[i].name = "getpage_sync_requests_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->getpage_sync_requests_total;
|
||||
i++;
|
||||
metrics[i].name = "getpage_prefetch_misses_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->getpage_prefetch_misses_total;
|
||||
i++;
|
||||
metrics[i].name = "getpage_prefetch_discards_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->getpage_prefetch_discards_total;
|
||||
i++;
|
||||
metrics[i].name = "pageserver_requests_sent_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->pageserver_requests_sent_total;
|
||||
i++;
|
||||
metrics[i].name = "pageserver_disconnects_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->pageserver_disconnects_total;
|
||||
i++;
|
||||
metrics[i].name = "pageserver_send_flushes_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->pageserver_send_flushes_total;
|
||||
i++;
|
||||
metrics[i].name = "file_cache_hits_total";
|
||||
metrics[i].is_bucket = false;
|
||||
metrics[i].value = (double) counters->file_cache_hits_total;
|
||||
i++;
|
||||
|
||||
Assert(i == NUM_METRICS);
|
||||
|
||||
#undef APPEND_METRIC
|
||||
#undef NUM_METRICS
|
||||
|
||||
/* NULL entry marks end of array */
|
||||
metrics[i].name = NULL;
|
||||
metrics[i].value = 0;
|
||||
@@ -254,15 +216,6 @@ neon_get_backend_perf_counters(PG_FUNCTION_ARGS)
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
static inline void
|
||||
histogram_merge_into(IOHistogram into, IOHistogram from)
|
||||
{
|
||||
into->wait_us_count += from->wait_us_count;
|
||||
into->wait_us_sum += from->wait_us_sum;
|
||||
for (int bucketno = 0; bucketno < NUM_IO_WAIT_BUCKETS; bucketno++)
|
||||
into->wait_us_bucket[bucketno] += from->wait_us_bucket[bucketno];
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_get_perf_counters);
|
||||
Datum
|
||||
neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
@@ -281,7 +234,10 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
{
|
||||
neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno];
|
||||
|
||||
histogram_merge_into(&totals.getpage_hist, &counters->getpage_hist);
|
||||
totals.getpage_wait_us_count += counters->getpage_wait_us_count;
|
||||
totals.getpage_wait_us_sum += counters->getpage_wait_us_sum;
|
||||
for (int bucketno = 0; bucketno < NUM_GETPAGE_WAIT_BUCKETS; bucketno++)
|
||||
totals.getpage_wait_us_bucket[bucketno] += counters->getpage_wait_us_bucket[bucketno];
|
||||
totals.getpage_prefetch_requests_total += counters->getpage_prefetch_requests_total;
|
||||
totals.getpage_sync_requests_total += counters->getpage_sync_requests_total;
|
||||
totals.getpage_prefetch_misses_total += counters->getpage_prefetch_misses_total;
|
||||
@@ -289,11 +245,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
totals.pageserver_requests_sent_total += counters->pageserver_requests_sent_total;
|
||||
totals.pageserver_disconnects_total += counters->pageserver_disconnects_total;
|
||||
totals.pageserver_send_flushes_total += counters->pageserver_send_flushes_total;
|
||||
totals.pageserver_open_requests += counters->pageserver_open_requests;
|
||||
totals.getpage_prefetches_buffered += counters->getpage_prefetches_buffered;
|
||||
totals.file_cache_hits_total += counters->file_cache_hits_total;
|
||||
histogram_merge_into(&totals.file_cache_read_hist, &counters->file_cache_read_hist);
|
||||
histogram_merge_into(&totals.file_cache_write_hist, &counters->file_cache_write_hist);
|
||||
}
|
||||
|
||||
metrics = neon_perf_counters_to_metrics(&totals);
|
||||
|
||||
@@ -15,26 +15,17 @@
|
||||
#include "storage/proc.h"
|
||||
#endif
|
||||
|
||||
static const uint64 io_wait_bucket_thresholds[] = {
|
||||
2, 3, 6, 10, /* 0 us - 10 us */
|
||||
20, 30, 60, 100, /* 10 us - 100 us */
|
||||
static const uint64 getpage_wait_bucket_thresholds[] = {
|
||||
20, 30, 60, 100, /* 0 - 100 us */
|
||||
200, 300, 600, 1000, /* 100 us - 1 ms */
|
||||
2000, 3000, 6000, 10000, /* 1 ms - 10 ms */
|
||||
20000, 30000, 60000, 100000, /* 10 ms - 100 ms */
|
||||
200000, 300000, 600000, 1000000, /* 100 ms - 1 s */
|
||||
2000000, 3000000, 6000000, 10000000, /* 1 s - 10 s */
|
||||
20000000, 30000000, 60000000, 100000000, /* 10 s - 100 s */
|
||||
UINT64_MAX,
|
||||
};
|
||||
#define NUM_IO_WAIT_BUCKETS (lengthof(io_wait_bucket_thresholds))
|
||||
|
||||
typedef struct IOHistogramData
|
||||
{
|
||||
uint64 wait_us_count;
|
||||
uint64 wait_us_sum;
|
||||
uint64 wait_us_bucket[NUM_IO_WAIT_BUCKETS];
|
||||
} IOHistogramData;
|
||||
|
||||
typedef IOHistogramData *IOHistogram;
|
||||
#define NUM_GETPAGE_WAIT_BUCKETS (lengthof(getpage_wait_bucket_thresholds))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
@@ -48,7 +39,9 @@ typedef struct
|
||||
* the backend, but the 'neon_backend_perf_counters' view will convert
|
||||
* them to seconds, to make them more idiomatic as prometheus metrics.
|
||||
*/
|
||||
IOHistogramData getpage_hist;
|
||||
uint64 getpage_wait_us_count;
|
||||
uint64 getpage_wait_us_sum;
|
||||
uint64 getpage_wait_us_bucket[NUM_GETPAGE_WAIT_BUCKETS];
|
||||
|
||||
/*
|
||||
* Total number of speculative prefetch Getpage requests and synchronous
|
||||
@@ -57,11 +50,7 @@ typedef struct
|
||||
uint64 getpage_prefetch_requests_total;
|
||||
uint64 getpage_sync_requests_total;
|
||||
|
||||
/*
|
||||
* Total number of readahead misses; consisting of either prefetches that
|
||||
* don't satisfy the LSN bounds, or cases where no readahead was issued
|
||||
* for the read.
|
||||
*/
|
||||
/* XXX: It's not clear to me when these misses happen. */
|
||||
uint64 getpage_prefetch_misses_total;
|
||||
|
||||
/*
|
||||
@@ -91,16 +80,6 @@ typedef struct
|
||||
* this can be smaller than pageserver_requests_sent_total.
|
||||
*/
|
||||
uint64 pageserver_send_flushes_total;
|
||||
|
||||
/*
|
||||
* Number of open requests to PageServer.
|
||||
*/
|
||||
uint64 pageserver_open_requests;
|
||||
|
||||
/*
|
||||
* Number of unused prefetches currently cached in this backend.
|
||||
*/
|
||||
uint64 getpage_prefetches_buffered;
|
||||
|
||||
/*
|
||||
* Number of requests satisfied from the LFC.
|
||||
@@ -112,9 +91,6 @@ typedef struct
|
||||
*/
|
||||
uint64 file_cache_hits_total;
|
||||
|
||||
/* LFC I/O time buckets */
|
||||
IOHistogramData file_cache_read_hist;
|
||||
IOHistogramData file_cache_write_hist;
|
||||
} neon_per_backend_counters;
|
||||
|
||||
/* Pointer to the shared memory array of neon_per_backend_counters structs */
|
||||
@@ -135,8 +111,6 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
#endif
|
||||
|
||||
extern void inc_getpage_wait(uint64 latency);
|
||||
extern void inc_page_cache_read_wait(uint64 latency);
|
||||
extern void inc_page_cache_write_wait(uint64 latency);
|
||||
|
||||
extern Size NeonPerfCountersShmemSize(void);
|
||||
extern void NeonPerfCountersShmemInit(void);
|
||||
|
||||
@@ -488,11 +488,6 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
newPState->n_unused -= 1;
|
||||
}
|
||||
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
MyNeonCounters->pageserver_open_requests =
|
||||
MyPState->n_requests_inflight;
|
||||
|
||||
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
|
||||
{
|
||||
prefetch_set_unused(end);
|
||||
@@ -626,8 +621,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
@@ -681,15 +674,6 @@ prefetch_on_ps_disconnect(void)
|
||||
|
||||
prefetch_set_unused(ring_index);
|
||||
}
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
*/
|
||||
MyNeonCounters->pageserver_open_requests =
|
||||
MyPState->n_requests_inflight;
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -722,9 +706,6 @@ prefetch_set_unused(uint64 ring_index)
|
||||
|
||||
MyPState->n_responses_buffered -= 1;
|
||||
MyPState->n_unused += 1;
|
||||
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -839,15 +820,6 @@ prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
hashkey.buftag = tag;
|
||||
|
||||
Retry:
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
*/
|
||||
MyNeonCounters->pageserver_open_requests =
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
|
||||
min_ring_index = UINT64_MAX;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
@@ -1029,9 +1001,6 @@ Retry:
|
||||
prefetch_do_request(slot, lsns);
|
||||
}
|
||||
|
||||
MyNeonCounters->pageserver_open_requests =
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
|
||||
Assert(any_hits);
|
||||
|
||||
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
|
||||
@@ -1107,10 +1076,8 @@ page_server_request(void const *req)
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
MyNeonCounters->pageserver_open_requests++;
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
@@ -1119,8 +1086,6 @@ page_server_request(void const *req)
|
||||
* point, but this currently seems fine for now.
|
||||
*/
|
||||
page_server->disconnect(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests = 0;
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
@@ -1,24 +1,18 @@
|
||||
use crate::{
|
||||
auth,
|
||||
cache::Cached,
|
||||
compute,
|
||||
auth, compute,
|
||||
config::AuthenticationConfig,
|
||||
context::RequestMonitoring,
|
||||
control_plane::{self, provider::NodeInfo, CachedNodeInfo},
|
||||
control_plane::{self, provider::NodeInfo},
|
||||
error::{ReportableError, UserFacingError},
|
||||
proxy::connect_compute::ComputeConnectBackend,
|
||||
stream::PqStream,
|
||||
waiters,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use pq_proto::BeMessage as Be;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::ComputeCredentialKeys;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum WebAuthError {
|
||||
#[error(transparent)]
|
||||
@@ -31,11 +25,6 @@ pub(crate) enum WebAuthError {
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConsoleRedirectBackend {
|
||||
console_uri: reqwest::Url,
|
||||
}
|
||||
|
||||
impl UserFacingError for WebAuthError {
|
||||
fn to_string_client(&self) -> String {
|
||||
"Internal error".to_string()
|
||||
@@ -68,40 +57,7 @@ pub(crate) fn new_psql_session_id() -> String {
|
||||
hex::encode(rand::random::<[u8; 8]>())
|
||||
}
|
||||
|
||||
impl ConsoleRedirectBackend {
|
||||
pub fn new(console_uri: reqwest::Url) -> Self {
|
||||
Self { console_uri }
|
||||
}
|
||||
|
||||
pub(crate) async fn authenticate(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<ConsoleRedirectNodeInfo> {
|
||||
authenticate(ctx, auth_config, &self.console_uri, client)
|
||||
.await
|
||||
.map(ConsoleRedirectNodeInfo)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
|
||||
|
||||
#[async_trait]
|
||||
impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
_ctx: &RequestMonitoring,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
|
||||
Ok(Cached::new_uncached(self.0.clone()))
|
||||
}
|
||||
|
||||
fn get_keys(&self) -> &ComputeCredentialKeys {
|
||||
&ComputeCredentialKeys::None
|
||||
}
|
||||
}
|
||||
|
||||
async fn authenticate(
|
||||
pub(super) async fn authenticate(
|
||||
ctx: &RequestMonitoring,
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
link_uri: &reqwest::Url,
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use console_redirect::ConsoleRedirectBackend;
|
||||
pub(crate) use console_redirect::WebAuthError;
|
||||
use ipnet::{Ipv4Net, Ipv6Net};
|
||||
use local::LocalBackend;
|
||||
@@ -22,7 +21,7 @@ use crate::cache::Cached;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::GetAuthInfoError;
|
||||
use crate::control_plane::provider::{CachedRoleSecret, ControlPlaneBackend};
|
||||
use crate::control_plane::AuthSecret;
|
||||
use crate::control_plane::{AuthSecret, NodeInfo};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
@@ -37,7 +36,7 @@ use crate::{
|
||||
provider::{CachedAllowedIps, CachedNodeInfo},
|
||||
Api,
|
||||
},
|
||||
stream,
|
||||
stream, url,
|
||||
};
|
||||
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
|
||||
|
||||
@@ -66,9 +65,11 @@ impl<T> std::ops::Deref for MaybeOwned<'_, T> {
|
||||
/// * However, when we substitute `T` with [`ComputeUserInfoMaybeEndpoint`],
|
||||
/// this helps us provide the credentials only to those auth
|
||||
/// backends which require them for the authentication process.
|
||||
pub enum Backend<'a, T> {
|
||||
pub enum Backend<'a, T, D> {
|
||||
/// Cloud API (V2).
|
||||
ControlPlane(MaybeOwned<'a, ControlPlaneBackend>, T),
|
||||
/// Authentication via a web browser.
|
||||
ConsoleRedirect(MaybeOwned<'a, url::ApiUrl>, D),
|
||||
/// Local proxy uses configured auth credentials and does not wake compute
|
||||
Local(MaybeOwned<'a, LocalBackend>),
|
||||
}
|
||||
@@ -89,7 +90,7 @@ impl Clone for Box<dyn TestBackend> {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Backend<'_, ()> {
|
||||
impl std::fmt::Display for Backend<'_, (), ()> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::ControlPlane(api, ()) => match &**api {
|
||||
@@ -105,39 +106,46 @@ impl std::fmt::Display for Backend<'_, ()> {
|
||||
#[cfg(test)]
|
||||
ControlPlaneBackend::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
|
||||
},
|
||||
Self::ConsoleRedirect(url, ()) => fmt
|
||||
.debug_tuple("ConsoleRedirect")
|
||||
.field(&url.as_str())
|
||||
.finish(),
|
||||
Self::Local(_) => fmt.debug_tuple("Local").finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Backend<'_, T> {
|
||||
impl<T, D> Backend<'_, T, D> {
|
||||
/// Very similar to [`std::option::Option::as_ref`].
|
||||
/// This helps us pass structured config to async tasks.
|
||||
pub(crate) fn as_ref(&self) -> Backend<'_, &T> {
|
||||
pub(crate) fn as_ref(&self) -> Backend<'_, &T, &D> {
|
||||
match self {
|
||||
Self::ControlPlane(c, x) => Backend::ControlPlane(MaybeOwned::Borrowed(c), x),
|
||||
Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(MaybeOwned::Borrowed(c), x),
|
||||
Self::Local(l) => Backend::Local(MaybeOwned::Borrowed(l)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Backend<'a, T> {
|
||||
impl<'a, T, D> Backend<'a, T, D> {
|
||||
/// Very similar to [`std::option::Option::map`].
|
||||
/// Maps [`Backend<T>`] to [`Backend<R>`] by applying
|
||||
/// a function to a contained value.
|
||||
pub(crate) fn map<R>(self, f: impl FnOnce(T) -> R) -> Backend<'a, R> {
|
||||
pub(crate) fn map<R>(self, f: impl FnOnce(T) -> R) -> Backend<'a, R, D> {
|
||||
match self {
|
||||
Self::ControlPlane(c, x) => Backend::ControlPlane(c, f(x)),
|
||||
Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(c, x),
|
||||
Self::Local(l) => Backend::Local(l),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<'a, T, E> Backend<'a, Result<T, E>> {
|
||||
impl<'a, T, D, E> Backend<'a, Result<T, E>, D> {
|
||||
/// Very similar to [`std::option::Option::transpose`].
|
||||
/// This is most useful for error handling.
|
||||
pub(crate) fn transpose(self) -> Result<Backend<'a, T>, E> {
|
||||
pub(crate) fn transpose(self) -> Result<Backend<'a, T, D>, E> {
|
||||
match self {
|
||||
Self::ControlPlane(c, x) => x.map(|x| Backend::ControlPlane(c, x)),
|
||||
Self::ConsoleRedirect(c, x) => Ok(Backend::ConsoleRedirect(c, x)),
|
||||
Self::Local(l) => Ok(Backend::Local(l)),
|
||||
}
|
||||
}
|
||||
@@ -233,6 +241,7 @@ impl AuthenticationConfig {
|
||||
pub(crate) fn check_rate_limit(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
config: &AuthenticationConfig,
|
||||
secret: AuthSecret,
|
||||
endpoint: &EndpointId,
|
||||
is_cleartext: bool,
|
||||
@@ -256,7 +265,7 @@ impl AuthenticationConfig {
|
||||
let limit_not_exceeded = self.rate_limiter.check(
|
||||
(
|
||||
endpoint_int,
|
||||
MaskedIp::new(ctx.peer_addr(), self.rate_limit_ip_subnet),
|
||||
MaskedIp::new(ctx.peer_addr(), config.rate_limit_ip_subnet),
|
||||
),
|
||||
password_weight,
|
||||
);
|
||||
@@ -330,6 +339,7 @@ async fn auth_quirks(
|
||||
let secret = if let Some(secret) = secret {
|
||||
config.check_rate_limit(
|
||||
ctx,
|
||||
config,
|
||||
secret,
|
||||
&info.endpoint,
|
||||
unauthenticated_password.is_some() || allow_cleartext,
|
||||
@@ -405,11 +415,12 @@ async fn authenticate_with_secret(
|
||||
classic::authenticate(ctx, info, client, config, secret).await
|
||||
}
|
||||
|
||||
impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
/// Get username from the credentials.
|
||||
pub(crate) fn get_user(&self) -> &str {
|
||||
match self {
|
||||
Self::ControlPlane(_, user_info) => &user_info.user,
|
||||
Self::ConsoleRedirect(_, ()) => "web",
|
||||
Self::Local(_) => "local",
|
||||
}
|
||||
}
|
||||
@@ -423,7 +434,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> auth::Result<Backend<'a, ComputeCredentials>> {
|
||||
) -> auth::Result<Backend<'a, ComputeCredentials, NodeInfo>> {
|
||||
let res = match self {
|
||||
Self::ControlPlane(api, user_info) => {
|
||||
info!(
|
||||
@@ -444,6 +455,14 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
.await?;
|
||||
Backend::ControlPlane(api, credentials)
|
||||
}
|
||||
// NOTE: this auth backend doesn't use client credentials.
|
||||
Self::ConsoleRedirect(url, ()) => {
|
||||
info!("performing web authentication");
|
||||
|
||||
let info = console_redirect::authenticate(ctx, config, &url, client).await?;
|
||||
|
||||
Backend::ConsoleRedirect(url, info)
|
||||
}
|
||||
Self::Local(_) => {
|
||||
return Err(auth::AuthError::bad_auth_method("invalid for local proxy"))
|
||||
}
|
||||
@@ -454,13 +473,14 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Backend<'_, ComputeUserInfo> {
|
||||
impl Backend<'_, ComputeUserInfo, &()> {
|
||||
pub(crate) async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
match self {
|
||||
Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await,
|
||||
Self::ConsoleRedirect(_, ()) => Ok(Cached::new_uncached(None)),
|
||||
Self::Local(_) => Ok(Cached::new_uncached(None)),
|
||||
}
|
||||
}
|
||||
@@ -473,19 +493,21 @@ impl Backend<'_, ComputeUserInfo> {
|
||||
Self::ControlPlane(api, user_info) => {
|
||||
api.get_allowed_ips_and_secret(ctx, user_info).await
|
||||
}
|
||||
Self::ConsoleRedirect(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
|
||||
impl ComputeConnectBackend for Backend<'_, ComputeCredentials, NodeInfo> {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
|
||||
Self::ConsoleRedirect(_, info) => Ok(Cached::new_uncached(info.clone())),
|
||||
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
|
||||
}
|
||||
}
|
||||
@@ -493,6 +515,31 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
|
||||
fn get_keys(&self) -> &ComputeCredentialKeys {
|
||||
match self {
|
||||
Self::ControlPlane(_, creds) => &creds.keys,
|
||||
Self::ConsoleRedirect(_, _) => &ComputeCredentialKeys::None,
|
||||
Self::Local(_) => &ComputeCredentialKeys::None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ComputeConnectBackend for Backend<'_, ComputeCredentials, &()> {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
|
||||
Self::ConsoleRedirect(_, ()) => {
|
||||
unreachable!("web auth flow doesn't support waking the compute")
|
||||
}
|
||||
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_keys(&self) -> &ComputeCredentialKeys {
|
||||
match self {
|
||||
Self::ControlPlane(_, creds) => &creds.keys,
|
||||
Self::ConsoleRedirect(_, ()) => &ComputeCredentialKeys::None,
|
||||
Self::Local(_) => &ComputeCredentialKeys::None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ use compute_api::spec::LocalProxySpec;
|
||||
use dashmap::DashMap;
|
||||
use futures::future::Either;
|
||||
use proxy::{
|
||||
auth::{
|
||||
self,
|
||||
backend::{
|
||||
jwt::JwkCache,
|
||||
local::{LocalBackend, JWKS_ROLE_MAP},
|
||||
},
|
||||
auth::backend::{
|
||||
jwt::JwkCache,
|
||||
local::{LocalBackend, JWKS_ROLE_MAP},
|
||||
},
|
||||
cancellation::CancellationHandlerMain,
|
||||
config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig},
|
||||
@@ -135,7 +132,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let args = LocalProxyCliArgs::parse();
|
||||
let config = build_config(&args)?;
|
||||
let auth_backend = build_auth_backend(&args)?;
|
||||
|
||||
// before we bind to any ports, write the process ID to a file
|
||||
// so that compute-ctl can find our process later
|
||||
@@ -197,7 +193,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let task = serverless::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
http_listener,
|
||||
shutdown.clone(),
|
||||
Arc::new(CancellationHandlerMain::new(
|
||||
@@ -262,6 +257,9 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
|
||||
Ok(Box::leak(Box::new(ProxyConfig {
|
||||
tls_config: None,
|
||||
auth_backend: proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned(
|
||||
LocalBackend::new(args.compute),
|
||||
)),
|
||||
metric_collection: None,
|
||||
allow_self_signed_compute: false,
|
||||
http_config,
|
||||
@@ -288,17 +286,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
})))
|
||||
}
|
||||
|
||||
/// auth::Backend is created at proxy startup, and lives forever.
|
||||
fn build_auth_backend(
|
||||
args: &LocalProxyCliArgs,
|
||||
) -> anyhow::Result<&'static auth::Backend<'static, ()>> {
|
||||
let auth_backend = proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned(
|
||||
LocalBackend::new(args.compute),
|
||||
));
|
||||
|
||||
Ok(Box::leak(Box::new(auth_backend)))
|
||||
}
|
||||
|
||||
async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
|
||||
loop {
|
||||
rx.notified().await;
|
||||
|
||||
@@ -10,7 +10,6 @@ use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::auth::backend::jwt::JwkCache;
|
||||
use proxy::auth::backend::AuthRateLimiter;
|
||||
use proxy::auth::backend::ConsoleRedirectBackend;
|
||||
use proxy::auth::backend::MaybeOwned;
|
||||
use proxy::cancellation::CancelMap;
|
||||
use proxy::cancellation::CancellationHandler;
|
||||
@@ -312,12 +311,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let args = ProxyCliArgs::parse();
|
||||
let config = build_config(&args)?;
|
||||
let auth_backend = build_auth_backend(&args)?;
|
||||
|
||||
match auth_backend {
|
||||
Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"),
|
||||
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
|
||||
};
|
||||
info!("Authentication backend: {}", config.auth_backend);
|
||||
info!("Using region: {}", args.aws_region);
|
||||
|
||||
let region_provider =
|
||||
@@ -464,41 +459,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
// client facing tasks. these will exit on error or on cancellation
|
||||
// cancellation returns Ok(())
|
||||
let mut client_tasks = JoinSet::new();
|
||||
match auth_backend {
|
||||
Either::Left(auth_backend) => {
|
||||
if let Some(proxy_listener) = proxy_listener {
|
||||
client_tasks.spawn(proxy::proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
if let Some(proxy_listener) = proxy_listener {
|
||||
client_tasks.spawn(proxy::proxy::task_main(
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(serverless_listener) = serverless_listener {
|
||||
client_tasks.spawn(serverless::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Either::Right(auth_backend) => {
|
||||
if let Some(proxy_listener) = proxy_listener {
|
||||
client_tasks.spawn(proxy::console_redirect_proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
if let Some(serverless_listener) = serverless_listener {
|
||||
client_tasks.spawn(serverless::task_main(
|
||||
config,
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
client_tasks.spawn(proxy::context::parquet::worker(
|
||||
@@ -528,7 +506,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
}
|
||||
|
||||
if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend {
|
||||
if let auth::Backend::ControlPlane(api, _) = &config.auth_backend {
|
||||
if let proxy::control_plane::provider::ControlPlaneBackend::Management(api) = &**api {
|
||||
match (redis_notifications_client, regional_redis_client.clone()) {
|
||||
(None, None) => {}
|
||||
@@ -632,83 +610,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
bail!("dynamic rate limiter should be disabled");
|
||||
}
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
limiter,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.connect_compute_lock.parse()?;
|
||||
info!(
|
||||
?limiter,
|
||||
shards,
|
||||
?epoch,
|
||||
"Using NodeLocks (connect_compute)"
|
||||
);
|
||||
let connect_compute_locks = control_plane::locks::ApiLocks::new(
|
||||
"connect_compute_lock",
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().proxy.connect_compute_lock,
|
||||
)?;
|
||||
|
||||
let http_config = HttpConfig {
|
||||
accept_websockets: !args.is_auth_broker,
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
|
||||
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
|
||||
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
|
||||
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
|
||||
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
|
||||
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
|
||||
},
|
||||
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
|
||||
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
|
||||
max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes,
|
||||
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
|
||||
};
|
||||
let authentication_config = AuthenticationConfig {
|
||||
jwks_cache: JwkCache::default(),
|
||||
thread_pool,
|
||||
scram_protocol_timeout: args.scram_protocol_timeout,
|
||||
rate_limiter_enabled: args.auth_rate_limit_enabled,
|
||||
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
|
||||
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
|
||||
ip_allowlist_check_enabled: !args.is_private_access_proxy,
|
||||
is_auth_broker: args.is_auth_broker,
|
||||
accept_jwts: args.is_auth_broker,
|
||||
webauth_confirmation_timeout: args.webauth_confirmation_timeout,
|
||||
};
|
||||
|
||||
let config = ProxyConfig {
|
||||
tls_config,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
authentication_config,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
region: args.region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute_retry_config: config::RetryConfig::parse(
|
||||
&args.connect_to_compute_retry,
|
||||
)?,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(config));
|
||||
|
||||
tokio::spawn(config.connect_compute_locks.garbage_collect_worker());
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// auth::Backend is created at proxy startup, and lives forever.
|
||||
fn build_auth_backend(
|
||||
args: &ProxyCliArgs,
|
||||
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
|
||||
match &args.auth_backend {
|
||||
let auth_backend = match &args.auth_backend {
|
||||
AuthBackendType::Console => {
|
||||
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
|
||||
let project_info_cache_config: ProjectInfoCacheOptions =
|
||||
@@ -758,11 +660,12 @@ fn build_auth_backend(
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
);
|
||||
let api = control_plane::provider::ControlPlaneBackend::Management(api);
|
||||
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
|
||||
auth::Backend::ControlPlane(MaybeOwned::Owned(api), ())
|
||||
}
|
||||
|
||||
let config = Box::leak(Box::new(auth_backend));
|
||||
|
||||
Ok(Either::Left(config))
|
||||
AuthBackendType::Web => {
|
||||
let url = args.uri.parse()?;
|
||||
auth::Backend::ConsoleRedirect(MaybeOwned::Owned(url), ())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -770,23 +673,79 @@ fn build_auth_backend(
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let api = control_plane::provider::mock::Api::new(url, !args.is_private_access_proxy);
|
||||
let api = control_plane::provider::ControlPlaneBackend::PostgresMock(api);
|
||||
|
||||
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
|
||||
|
||||
let config = Box::leak(Box::new(auth_backend));
|
||||
|
||||
Ok(Either::Left(config))
|
||||
auth::Backend::ControlPlane(MaybeOwned::Owned(api), ())
|
||||
}
|
||||
};
|
||||
|
||||
AuthBackendType::Web => {
|
||||
let url = args.uri.parse()?;
|
||||
let backend = ConsoleRedirectBackend::new(url);
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
limiter,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.connect_compute_lock.parse()?;
|
||||
info!(
|
||||
?limiter,
|
||||
shards,
|
||||
?epoch,
|
||||
"Using NodeLocks (connect_compute)"
|
||||
);
|
||||
let connect_compute_locks = control_plane::locks::ApiLocks::new(
|
||||
"connect_compute_lock",
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().proxy.connect_compute_lock,
|
||||
)?;
|
||||
|
||||
let config = Box::leak(Box::new(backend));
|
||||
let http_config = HttpConfig {
|
||||
accept_websockets: !args.is_auth_broker,
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
|
||||
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
|
||||
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
|
||||
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
|
||||
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
|
||||
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
|
||||
},
|
||||
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
|
||||
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
|
||||
max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes,
|
||||
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
|
||||
};
|
||||
let authentication_config = AuthenticationConfig {
|
||||
jwks_cache: JwkCache::default(),
|
||||
thread_pool,
|
||||
scram_protocol_timeout: args.scram_protocol_timeout,
|
||||
rate_limiter_enabled: args.auth_rate_limit_enabled,
|
||||
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
|
||||
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
|
||||
ip_allowlist_check_enabled: !args.is_private_access_proxy,
|
||||
is_auth_broker: args.is_auth_broker,
|
||||
accept_jwts: args.is_auth_broker,
|
||||
webauth_confirmation_timeout: args.webauth_confirmation_timeout,
|
||||
};
|
||||
|
||||
Ok(Either::Right(config))
|
||||
}
|
||||
}
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
tls_config,
|
||||
auth_backend,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
authentication_config,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
region: args.region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute_retry_config: config::RetryConfig::parse(
|
||||
&args.connect_to_compute_retry,
|
||||
)?,
|
||||
}));
|
||||
|
||||
tokio::spawn(config.connect_compute_locks.garbage_collect_worker());
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use crate::{
|
||||
auth::backend::{jwt::JwkCache, AuthRateLimiter},
|
||||
auth::{
|
||||
self,
|
||||
backend::{jwt::JwkCache, AuthRateLimiter},
|
||||
},
|
||||
control_plane::locks::ApiLocks,
|
||||
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
|
||||
scram::threadpool::ThreadPool,
|
||||
@@ -26,6 +29,7 @@ use x509_parser::oid_registry;
|
||||
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
pub auth_backend: auth::Backend<'static, (), ()>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
|
||||
@@ -1,217 +0,0 @@
|
||||
use crate::auth::backend::ConsoleRedirectBackend;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::proxy::{
|
||||
prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource,
|
||||
};
|
||||
use crate::{
|
||||
cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal},
|
||||
context::RequestMonitoring,
|
||||
error::ReportableError,
|
||||
metrics::{Metrics, NumClientConnectionsGuard},
|
||||
protocol2::read_proxy_protocol,
|
||||
proxy::handshake::{handshake, HandshakeData},
|
||||
};
|
||||
use futures::TryFutureExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, Instrument};
|
||||
|
||||
use crate::proxy::{
|
||||
connect_compute::{connect_to_compute, TcpMechanism},
|
||||
passthrough::ProxyPassthrough,
|
||||
};
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
backend: &'static ConsoleRedirectBackend,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
}
|
||||
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
let conn_gauge = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.guard(crate::metrics::Protocol::Tcp);
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
|
||||
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
|
||||
connections.spawn(async move {
|
||||
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
|
||||
Err(e) => {
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
error!("missing required proxy protocol header");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
error!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
Ok((socket, None)) => (socket, peer_addr.ip()),
|
||||
};
|
||||
|
||||
match socket.inner.set_nodelay(true) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("per-client task finished with an error: failed to set socket option: {e:#}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
);
|
||||
let res = startup.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
connections.close();
|
||||
drop(listener);
|
||||
|
||||
// Drain connections
|
||||
connections.wait().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
config: &'static ProxyConfig,
|
||||
backend: &'static ConsoleRedirectBackend,
|
||||
ctx: &RequestMonitoring,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
stream: S,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
info!(
|
||||
protocol = %ctx.protocol(),
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
let metrics = &Metrics::get().proxy;
|
||||
let proto = ctx.protocol();
|
||||
let request_gauge = metrics.connection_requests.guard(proto);
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
|
||||
let record_handshake_error = !ctx.has_private_peer_addr();
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(cancel_key_data, ctx.session_id())
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
let user_info = match backend
|
||||
.authenticate(ctx, &config.authentication_config, &mut stream)
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
Err(e) => {
|
||||
return stream.throw_error(e).await?;
|
||||
}
|
||||
};
|
||||
|
||||
let mut node = connect_to_compute(
|
||||
ctx,
|
||||
&TcpMechanism {
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
},
|
||||
&user_info,
|
||||
config.allow_self_signed_compute,
|
||||
config.wake_compute_retry_config,
|
||||
config.connect_to_compute_retry_config,
|
||||
)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
// driver in pipeline mode sends startup, password and first query
|
||||
// immediately after opening the connection.
|
||||
let (stream, read_buf) = stream.into_inner();
|
||||
node.stream.write_all(&read_buf).await?;
|
||||
|
||||
Ok(Some(ProxyPassthrough {
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
_cancel: session,
|
||||
}))
|
||||
}
|
||||
@@ -81,12 +81,12 @@ pub(crate) mod errors {
|
||||
Reason::EndpointNotFound => ErrorKind::User,
|
||||
Reason::BranchNotFound => ErrorKind::User,
|
||||
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
|
||||
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
|
||||
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::User,
|
||||
Reason::ActiveTimeQuotaExceeded => ErrorKind::User,
|
||||
Reason::ComputeTimeQuotaExceeded => ErrorKind::User,
|
||||
Reason::WrittenDataQuotaExceeded => ErrorKind::User,
|
||||
Reason::DataTransferQuotaExceeded => ErrorKind::User,
|
||||
Reason::LogicalSizeQuotaExceeded => ErrorKind::User,
|
||||
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
|
||||
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
|
||||
Reason::RunningOperations => ErrorKind::ControlPlane,
|
||||
@@ -103,7 +103,7 @@ pub(crate) mod errors {
|
||||
} if error
|
||||
.contains("compute time quota of non-primary branches is exceeded") =>
|
||||
{
|
||||
crate::error::ErrorKind::Quota
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
@@ -112,7 +112,7 @@ pub(crate) mod errors {
|
||||
} if error.contains("quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::Quota
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
|
||||
@@ -22,7 +22,7 @@ use futures::TryFutureExt;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
|
||||
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
|
||||
|
||||
@@ -456,7 +456,7 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
});
|
||||
body.http_status_code = status;
|
||||
|
||||
warn!("console responded with an error ({status}): {body:?}");
|
||||
error!("console responded with an error ({status}): {body:?}");
|
||||
Err(ApiError::ControlPlane(body))
|
||||
}
|
||||
|
||||
|
||||
@@ -49,10 +49,6 @@ pub enum ErrorKind {
|
||||
#[label(rename = "serviceratelimit")]
|
||||
ServiceRateLimit,
|
||||
|
||||
/// Proxy quota limit violation
|
||||
#[label(rename = "quota")]
|
||||
Quota,
|
||||
|
||||
/// internal errors
|
||||
Service,
|
||||
|
||||
@@ -74,7 +70,6 @@ impl ErrorKind {
|
||||
ErrorKind::ClientDisconnect => "clientdisconnect",
|
||||
ErrorKind::RateLimit => "ratelimit",
|
||||
ErrorKind::ServiceRateLimit => "serviceratelimit",
|
||||
ErrorKind::Quota => "quota",
|
||||
ErrorKind::Service => "service",
|
||||
ErrorKind::ControlPlane => "controlplane",
|
||||
ErrorKind::Postgres => "postgres",
|
||||
|
||||
@@ -95,7 +95,6 @@ pub mod cache;
|
||||
pub mod cancellation;
|
||||
pub mod compute;
|
||||
pub mod config;
|
||||
pub mod console_redirect_proxy;
|
||||
pub mod context;
|
||||
pub mod control_plane;
|
||||
pub mod error;
|
||||
|
||||
@@ -35,7 +35,7 @@ use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn, Instrument};
|
||||
use tracing::{error, info, Instrument};
|
||||
|
||||
use self::{
|
||||
connect_compute::{connect_to_compute, TcpMechanism},
|
||||
@@ -61,7 +61,6 @@ pub async fn run_until_cancelled<F: std::future::Future>(
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
@@ -96,15 +95,15 @@ pub async fn task_main(
|
||||
connections.spawn(async move {
|
||||
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
|
||||
Err(e) => {
|
||||
warn!("per-client task finished with an error: {e:#}");
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
warn!("missing required proxy protocol header");
|
||||
error!("missing required proxy protocol header");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
warn!("proxy protocol header not supported");
|
||||
error!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
@@ -130,7 +129,6 @@ pub async fn task_main(
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
@@ -146,7 +144,7 @@ pub async fn task_main(
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
warn!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
@@ -157,7 +155,7 @@ pub async fn task_main(
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
@@ -245,10 +243,8 @@ impl ReportableError for ClientRequestError {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
ctx: &RequestMonitoring,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
stream: S,
|
||||
@@ -289,7 +285,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let common_names = tls.map(|tls| &tls.common_names);
|
||||
|
||||
// Extract credentials which we're going to use for auth.
|
||||
let result = auth_backend
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names))
|
||||
.transpose();
|
||||
@@ -356,7 +353,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn prepare_client_connection<P>(
|
||||
async fn prepare_client_connection<P>(
|
||||
node: &compute::PostgresConnection,
|
||||
session: &cancellation::Session<P>,
|
||||
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
|
||||
@@ -71,7 +71,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
|
||||
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
|
||||
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
|
||||
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
|
||||
tracing::warn!(?err, "could not cancel the query in the database");
|
||||
tracing::error!(?err, "could not cancel the query in the database");
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
@@ -552,7 +552,7 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
|
||||
|
||||
fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> auth::Backend<'static, ComputeCredentials> {
|
||||
) -> auth::Backend<'static, ComputeCredentials, &()> {
|
||||
let user_info = auth::Backend::ControlPlane(
|
||||
MaybeOwned::Owned(ControlPlaneBackend::Test(Box::new(mechanism.clone()))),
|
||||
ComputeCredentials {
|
||||
|
||||
@@ -6,7 +6,7 @@ use redis::{
|
||||
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use super::elasticache::CredentialsProvider;
|
||||
|
||||
@@ -89,7 +89,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error during PING: {e:?}");
|
||||
error!("Error during PING: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -121,7 +121,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
info!("Connection succesfully established");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Connection is broken. Error during PING: {e:?}");
|
||||
error!("Connection is broken. Error during PING: {e:?}");
|
||||
}
|
||||
}
|
||||
self.con = Some(con);
|
||||
|
||||
@@ -146,7 +146,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to cancel session: {e}");
|
||||
tracing::error!("failed to cancel session: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
check_peer_addr_is_in_list, AuthError,
|
||||
},
|
||||
compute,
|
||||
config::ProxyConfig,
|
||||
config::{AuthenticationConfig, ProxyConfig},
|
||||
context::RequestMonitoring,
|
||||
control_plane::{
|
||||
errors::{GetAuthInfoError, WakeComputeError},
|
||||
@@ -28,7 +28,7 @@ use crate::{
|
||||
retry::{CouldRetry, ShouldRetryWakeCompute},
|
||||
},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
EndpointId, Host,
|
||||
Host,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -42,7 +42,6 @@ pub(crate) struct PoolingBackend {
|
||||
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
}
|
||||
|
||||
@@ -50,13 +49,18 @@ impl PoolingBackend {
|
||||
pub(crate) async fn authenticate_with_password(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
config: &AuthenticationConfig,
|
||||
user_info: &ComputeUserInfo,
|
||||
password: &[u8],
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
let user_info = user_info.clone();
|
||||
let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
|
||||
let backend = self
|
||||
.config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|()| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if self.config.authentication_config.ip_allowlist_check_enabled
|
||||
if config.ip_allowlist_check_enabled
|
||||
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
|
||||
{
|
||||
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
|
||||
@@ -75,6 +79,7 @@ impl PoolingBackend {
|
||||
let secret = match cached_secret.value.clone() {
|
||||
Some(secret) => self.config.authentication_config.check_rate_limit(
|
||||
ctx,
|
||||
config,
|
||||
secret,
|
||||
&user_info.endpoint,
|
||||
true,
|
||||
@@ -86,13 +91,9 @@ impl PoolingBackend {
|
||||
}
|
||||
};
|
||||
let ep = EndpointIdInt::from(&user_info.endpoint);
|
||||
let auth_outcome = crate::auth::validate_password_and_exchange(
|
||||
&self.config.authentication_config.thread_pool,
|
||||
ep,
|
||||
password,
|
||||
secret,
|
||||
)
|
||||
.await?;
|
||||
let auth_outcome =
|
||||
crate::auth::validate_password_and_exchange(&config.thread_pool, ep, password, secret)
|
||||
.await?;
|
||||
let res = match auth_outcome {
|
||||
crate::sasl::Outcome::Success(key) => {
|
||||
info!("user successfully authenticated");
|
||||
@@ -112,13 +113,13 @@ impl PoolingBackend {
|
||||
pub(crate) async fn authenticate_with_jwt(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
config: &AuthenticationConfig,
|
||||
user_info: &ComputeUserInfo,
|
||||
jwt: String,
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
match &self.auth_backend {
|
||||
match &self.config.auth_backend {
|
||||
crate::auth::Backend::ControlPlane(console, ()) => {
|
||||
self.config
|
||||
.authentication_config
|
||||
config
|
||||
.jwks_cache
|
||||
.check_jwt(
|
||||
ctx,
|
||||
@@ -135,10 +136,11 @@ impl PoolingBackend {
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
})
|
||||
}
|
||||
crate::auth::Backend::ConsoleRedirect(_, ()) => Err(AuthError::auth_failed(
|
||||
"JWT login over web auth proxy is not supported",
|
||||
)),
|
||||
crate::auth::Backend::Local(_) => {
|
||||
let keys = self
|
||||
.config
|
||||
.authentication_config
|
||||
let keys = config
|
||||
.jwks_cache
|
||||
.check_jwt(
|
||||
ctx,
|
||||
@@ -183,7 +185,7 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| keys);
|
||||
let backend = self.config.auth_backend.as_ref().map(|()| keys);
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
@@ -215,14 +217,14 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
endpoint: EndpointId::from(format!("{}-local-proxy", conn_info.user_info.endpoint)),
|
||||
options: conn_info.user_info.options.clone(),
|
||||
},
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
});
|
||||
let backend = self
|
||||
.config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|()| ComputeCredentials {
|
||||
info: conn_info.user_info.clone(),
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
});
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&HyperMechanism {
|
||||
@@ -260,8 +262,8 @@ impl PoolingBackend {
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
|
||||
|
||||
let mut node_info = match &self.auth_backend {
|
||||
auth::Backend::ControlPlane(_, ()) => {
|
||||
let mut node_info = match &self.config.auth_backend {
|
||||
auth::Backend::ControlPlane(_, ()) | auth::Backend::ConsoleRedirect(_, ()) => {
|
||||
unreachable!("only local_proxy can connect to local postgres")
|
||||
}
|
||||
auth::Backend::Local(local) => local.node_info.clone(),
|
||||
@@ -505,12 +507,8 @@ impl ConnectMechanism for HyperMechanism {
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
|
||||
let port = *node_info.config.get_ports().first().ok_or_else(|| {
|
||||
HttpConnError::WakeCompute(WakeComputeError::BadComputeAddress(
|
||||
"local-proxy port missing on compute address".into(),
|
||||
))
|
||||
})?;
|
||||
let res = connect_http2(&host, port, timeout).await;
|
||||
// let port = node_info.config.get_ports().first().unwrap_or_else(10432);
|
||||
let res = connect_http2(&host, 10432, timeout).await;
|
||||
drop(pause);
|
||||
let (client, connection) = permit.release_result(res)?;
|
||||
|
||||
|
||||
@@ -41,10 +41,6 @@ pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes,
|
||||
err.to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
),
|
||||
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::REQUEST_TIMEOUT,
|
||||
|
||||
@@ -48,14 +48,13 @@ use std::pin::{pin, Pin};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn, Instrument};
|
||||
use tracing::{error, info, warn, Instrument};
|
||||
use utils::http::error::ApiError;
|
||||
|
||||
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
ws_listener: TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
@@ -111,7 +110,6 @@ pub async fn task_main(
|
||||
local_pool,
|
||||
pool: Arc::clone(&conn_pool),
|
||||
config,
|
||||
auth_backend,
|
||||
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
|
||||
});
|
||||
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = match config.tls_config.as_ref() {
|
||||
@@ -243,7 +241,7 @@ async fn connection_startup(
|
||||
let (conn, peer) = match read_proxy_protocol(conn).await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
|
||||
tracing::error!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
@@ -399,7 +397,6 @@ async fn request_handler(
|
||||
async move {
|
||||
if let Err(e) = websocket::serve_websocket(
|
||||
config,
|
||||
backend.auth_backend,
|
||||
ctx,
|
||||
websocket,
|
||||
cancellation_handler,
|
||||
@@ -408,7 +405,7 @@ async fn request_handler(
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("error in websocket connection: {e:#}");
|
||||
error!("error in websocket connection: {e:#}");
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
|
||||
@@ -45,7 +45,6 @@ use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::auth::ComputeUserInfoParseError;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::config::HttpConfig;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
@@ -555,7 +554,7 @@ async fn handle_inner(
|
||||
|
||||
match conn_info.auth {
|
||||
AuthData::Jwt(jwt) if config.authentication_config.is_auth_broker => {
|
||||
handle_auth_broker_inner(ctx, request, conn_info.conn_info, jwt, backend).await
|
||||
handle_auth_broker_inner(config, ctx, request, conn_info.conn_info, jwt, backend).await
|
||||
}
|
||||
auth => {
|
||||
handle_db_inner(
|
||||
@@ -623,17 +622,28 @@ async fn handle_db_inner(
|
||||
|
||||
let authenticate_and_connect = Box::pin(
|
||||
async {
|
||||
let is_local_proxy = matches!(backend.auth_backend, crate::auth::Backend::Local(_));
|
||||
let is_local_proxy =
|
||||
matches!(backend.config.auth_backend, crate::auth::Backend::Local(_));
|
||||
|
||||
let keys = match auth {
|
||||
AuthData::Password(pw) => {
|
||||
backend
|
||||
.authenticate_with_password(ctx, &conn_info.user_info, &pw)
|
||||
.authenticate_with_password(
|
||||
ctx,
|
||||
&config.authentication_config,
|
||||
&conn_info.user_info,
|
||||
&pw,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
AuthData::Jwt(jwt) => {
|
||||
backend
|
||||
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
|
||||
.authenticate_with_jwt(
|
||||
ctx,
|
||||
&config.authentication_config,
|
||||
&conn_info.user_info,
|
||||
jwt,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
@@ -681,7 +691,7 @@ async fn handle_db_inner(
|
||||
// Now execute the query and return the result.
|
||||
let json_output = match payload {
|
||||
Payload::Single(stmt) => {
|
||||
stmt.process(&config.http_config, cancel, &mut client, parsed_headers)
|
||||
stmt.process(config, cancel, &mut client, parsed_headers)
|
||||
.await?
|
||||
}
|
||||
Payload::Batch(statements) => {
|
||||
@@ -699,7 +709,7 @@ async fn handle_db_inner(
|
||||
}
|
||||
|
||||
statements
|
||||
.process(&config.http_config, cancel, &mut client, parsed_headers)
|
||||
.process(config, cancel, &mut client, parsed_headers)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
@@ -739,6 +749,7 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[
|
||||
];
|
||||
|
||||
async fn handle_auth_broker_inner(
|
||||
config: &'static ProxyConfig,
|
||||
ctx: &RequestMonitoring,
|
||||
request: Request<Incoming>,
|
||||
conn_info: ConnInfo,
|
||||
@@ -746,7 +757,12 @@ async fn handle_auth_broker_inner(
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
backend
|
||||
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
|
||||
.authenticate_with_jwt(
|
||||
ctx,
|
||||
&config.authentication_config,
|
||||
&conn_info.user_info,
|
||||
jwt,
|
||||
)
|
||||
.await
|
||||
.map_err(HttpConnError::from)?;
|
||||
|
||||
@@ -784,7 +800,7 @@ async fn handle_auth_broker_inner(
|
||||
impl QueryData {
|
||||
async fn process(
|
||||
self,
|
||||
config: &'static HttpConfig,
|
||||
config: &'static ProxyConfig,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client,
|
||||
parsed_headers: HttpHeaders,
|
||||
@@ -815,7 +831,7 @@ impl QueryData {
|
||||
Either::Right((_cancelled, query)) => {
|
||||
tracing::info!("cancelling query");
|
||||
if let Err(err) = cancel_token.cancel_query(NoTls).await {
|
||||
tracing::warn!(?err, "could not cancel query");
|
||||
tracing::error!(?err, "could not cancel query");
|
||||
}
|
||||
// wait for the query cancellation
|
||||
match time::timeout(time::Duration::from_millis(100), query).await {
|
||||
@@ -858,7 +874,7 @@ impl QueryData {
|
||||
impl BatchQueryData {
|
||||
async fn process(
|
||||
self,
|
||||
config: &'static HttpConfig,
|
||||
config: &'static ProxyConfig,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client,
|
||||
parsed_headers: HttpHeaders,
|
||||
@@ -904,7 +920,7 @@ impl BatchQueryData {
|
||||
}
|
||||
Err(SqlOverHttpError::Cancelled(_)) => {
|
||||
if let Err(err) = cancel_token.cancel_query(NoTls).await {
|
||||
tracing::warn!(?err, "could not cancel query");
|
||||
tracing::error!(?err, "could not cancel query");
|
||||
}
|
||||
// TODO: after cancelling, wait to see if we can get a status. maybe the connection is still safe.
|
||||
discard.discard();
|
||||
@@ -928,7 +944,7 @@ impl BatchQueryData {
|
||||
}
|
||||
|
||||
async fn query_batch(
|
||||
config: &'static HttpConfig,
|
||||
config: &'static ProxyConfig,
|
||||
cancel: CancellationToken,
|
||||
transaction: &Transaction<'_>,
|
||||
queries: BatchQueryData,
|
||||
@@ -967,7 +983,7 @@ async fn query_batch(
|
||||
}
|
||||
|
||||
async fn query_to_json<T: GenericClient>(
|
||||
config: &'static HttpConfig,
|
||||
config: &'static ProxyConfig,
|
||||
client: &T,
|
||||
data: QueryData,
|
||||
current_size: &mut usize,
|
||||
@@ -988,9 +1004,9 @@ async fn query_to_json<T: GenericClient>(
|
||||
rows.push(row);
|
||||
// we don't have a streaming response support yet so this is to prevent OOM
|
||||
// from a malicious query (eg a cross join)
|
||||
if *current_size > config.max_response_size_bytes {
|
||||
if *current_size > config.http_config.max_response_size_bytes {
|
||||
return Err(SqlOverHttpError::ResponseTooLarge(
|
||||
config.max_response_size_bytes,
|
||||
config.http_config.max_response_size_bytes,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
|
||||
pub(crate) async fn serve_websocket(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
ctx: RequestMonitoring,
|
||||
websocket: OnUpgrade,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
@@ -146,7 +145,6 @@ pub(crate) async fn serve_websocket(
|
||||
|
||||
let res = Box::pin(handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
WebSocketRw::new(websocket),
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::{
|
||||
};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
use tracing::{error, info, instrument, trace};
|
||||
use utils::backoff;
|
||||
use uuid::{NoContext, Timestamp};
|
||||
|
||||
@@ -346,7 +346,7 @@ async fn collect_metrics_iteration(
|
||||
error!("metrics endpoint refused the sent metrics: {:?}", res);
|
||||
for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
|
||||
// Report if the metric value is suspiciously large
|
||||
warn!("potentially abnormal metric value: {:?}", metric);
|
||||
error!("potentially abnormal metric value: {:?}", metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,20 +15,15 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
(
|
||||
Scope::Admin
|
||||
| Scope::PageServerApi
|
||||
| Scope::GenerationsApi
|
||||
| Scope::Infra
|
||||
| Scope::Scrubber,
|
||||
_,
|
||||
) => Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Safekeeper auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
)),
|
||||
(Scope::Admin | Scope::PageServerApi | Scope::GenerationsApi | Scope::Scrubber, _) => {
|
||||
Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Safekeeper auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
))
|
||||
}
|
||||
(Scope::SafekeeperData, _) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -636,7 +636,7 @@ async fn handle_tenant_list(
|
||||
}
|
||||
|
||||
async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -1182,7 +1182,7 @@ async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, Api
|
||||
/// Assumes information is only relayed to storage controller after first selecting an unique id on
|
||||
/// control plane database, which means we have an id field in the request and payload.
|
||||
async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let body = json_request::<SafekeeperPersistence>(&mut req).await?;
|
||||
let id = parse_request_param::<i64>(&req, "id")?;
|
||||
|
||||
@@ -246,11 +246,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
// storage controller's auth configuration.
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}"))
|
||||
}
|
||||
mgmt_api::Error::ApiError(status @ StatusCode::TOO_MANY_REQUESTS, msg) => {
|
||||
// Pass through 429 errors: if pageserver is asking us to wait + retry, we in
|
||||
// turn ask our clients to wait + retry
|
||||
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
|
||||
}
|
||||
mgmt_api::Error::ApiError(status, msg) => {
|
||||
// Presume general case of pageserver API errors is that we tried to do something
|
||||
// that can't be done right now.
|
||||
|
||||
@@ -317,8 +317,9 @@ pub async fn scan_pageserver_metadata(
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
let tenant_id = tenant_id.expect("Must be set if results are present");
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
let tenant_id = tenant_id.expect("Must be set if results are present");
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
tenant_id,
|
||||
|
||||
@@ -64,12 +64,10 @@ By default performance tests are excluded. To run them explicitly pass performan
|
||||
Useful environment variables:
|
||||
|
||||
`NEON_BIN`: The directory where neon binaries can be found.
|
||||
`COMPATIBILITY_NEON_BIN`: The directory where the previous version of Neon binaries can be found
|
||||
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
|
||||
Since pageserver supports several postgres versions, `POSTGRES_DISTRIB_DIR` must contain
|
||||
a subdirectory for each version with naming convention `v{PG_VERSION}/`.
|
||||
Inside that dir, a `bin/postgres` binary should be present.
|
||||
`COMPATIBILITY_POSTGRES_DISTRIB_DIR`: The directory where the prevoius version of postgres distribution can be found.
|
||||
`DEFAULT_PG_VERSION`: The version of Postgres to use,
|
||||
This is used to construct full path to the postgres binaries.
|
||||
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
|
||||
@@ -296,16 +294,6 @@ def test_foobar2(neon_env_builder: NeonEnvBuilder):
|
||||
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
```
|
||||
|
||||
All the test which rely on NeonEnvBuilder, can check the various version combinations of the components.
|
||||
To do this yuo may want to add the parametrize decorator with the function fixtures.utils.allpairs_versions()
|
||||
E.g.
|
||||
|
||||
```python
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_something(
|
||||
...
|
||||
```
|
||||
|
||||
For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html
|
||||
|
||||
At the end of a test, all the nodes in the environment are automatically stopped, so you
|
||||
|
||||
@@ -6,7 +6,6 @@ pytest_plugins = (
|
||||
"fixtures.httpserver",
|
||||
"fixtures.compute_reconfigure",
|
||||
"fixtures.storage_controller_proxy",
|
||||
"fixtures.paths",
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.pg_stats",
|
||||
|
||||
@@ -6,8 +6,6 @@ from enum import Enum
|
||||
from functools import total_ordering
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Union
|
||||
|
||||
@@ -33,36 +31,33 @@ class Lsn:
|
||||
self.lsn_int = (int(left, 16) << 32) + int(right, 16)
|
||||
assert 0 <= self.lsn_int <= 0xFFFFFFFF_FFFFFFFF
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
"""Convert lsn from int to standard hex notation."""
|
||||
return f"{(self.lsn_int >> 32):X}/{(self.lsn_int & 0xFFFFFFFF):X}"
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f'Lsn("{str(self)}")'
|
||||
|
||||
def __int__(self) -> int:
|
||||
return self.lsn_int
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
def __lt__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Lsn):
|
||||
return NotImplemented
|
||||
return self.lsn_int < other.lsn_int
|
||||
|
||||
def __gt__(self, other: object) -> bool:
|
||||
def __gt__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Lsn):
|
||||
raise NotImplementedError
|
||||
return self.lsn_int > other.lsn_int
|
||||
|
||||
@override
|
||||
def __eq__(self, other: object) -> bool:
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Lsn):
|
||||
return NotImplemented
|
||||
return self.lsn_int == other.lsn_int
|
||||
|
||||
# Returns the difference between two Lsns, in bytes
|
||||
def __sub__(self, other: object) -> int:
|
||||
def __sub__(self, other: Any) -> int:
|
||||
if not isinstance(other, Lsn):
|
||||
return NotImplemented
|
||||
return self.lsn_int - other.lsn_int
|
||||
@@ -75,7 +70,6 @@ class Lsn:
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
@override
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.lsn_int)
|
||||
|
||||
@@ -122,22 +116,19 @@ class Id:
|
||||
self.id = bytearray.fromhex(x)
|
||||
assert len(self.id) == 16
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self.id < other.id
|
||||
|
||||
@override
|
||||
def __eq__(self, other: object) -> bool:
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self.id == other.id
|
||||
|
||||
@override
|
||||
def __hash__(self) -> int:
|
||||
return hash(str(self.id))
|
||||
|
||||
@@ -148,31 +139,25 @@ class Id:
|
||||
|
||||
|
||||
class TenantId(Id):
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f'`TenantId("{self.id.hex()}")'
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
class NodeId(Id):
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f'`NodeId("{self.id.hex()}")'
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
class TimelineId(Id):
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f'TimelineId("{self.id.hex()}")'
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
@@ -202,7 +187,7 @@ class TenantShardId:
|
||||
assert self.shard_number < self.shard_count or self.shard_count == 0
|
||||
|
||||
@classmethod
|
||||
def parse(cls: type[TTenantShardId], input: str) -> TTenantShardId:
|
||||
def parse(cls: type[TTenantShardId], input) -> TTenantShardId:
|
||||
if len(input) == 32:
|
||||
return cls(
|
||||
tenant_id=TenantId(input),
|
||||
@@ -218,7 +203,6 @@ class TenantShardId:
|
||||
else:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
@override
|
||||
def __str__(self):
|
||||
if self.shard_count > 0:
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
@@ -226,25 +210,22 @@ class TenantShardId:
|
||||
# Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id)
|
||||
return str(self.tenant_id)
|
||||
|
||||
@override
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def _tuple(self) -> tuple[TenantId, int, int]:
|
||||
return (self.tenant_id, self.shard_number, self.shard_count)
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() < other._tuple()
|
||||
|
||||
@override
|
||||
def __eq__(self, other: object) -> bool:
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() == other._tuple()
|
||||
|
||||
@override
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._tuple())
|
||||
|
||||
|
||||
@@ -8,11 +8,9 @@ from contextlib import _GeneratorContextManager, contextmanager
|
||||
|
||||
# Type-related stuff
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
@@ -26,9 +24,6 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
|
||||
class PgCompare(ABC):
|
||||
"""Common interface of all postgres implementations, useful for benchmarks.
|
||||
@@ -70,12 +65,12 @@ class PgCompare(ABC):
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_pageserver_writes(self, out_name: str):
|
||||
def record_pageserver_writes(self, out_name):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_duration(self, out_name: str):
|
||||
def record_duration(self, out_name):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@@ -127,34 +122,28 @@ class NeonCompare(PgCompare):
|
||||
self._pg = self.env.endpoints.create_start("main", "main", self.tenant)
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg(self) -> PgProtocol:
|
||||
return self._pg
|
||||
|
||||
@property
|
||||
@override
|
||||
def zenbenchmark(self) -> NeonBenchmarker:
|
||||
return self._zenbenchmark
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg_bin(self) -> PgBin:
|
||||
return self._pg_bin
|
||||
|
||||
@override
|
||||
def flush(self, compact: bool = True, gc: bool = True):
|
||||
wait_for_last_flush_lsn(self.env, self._pg, self.tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_checkpoint(self.tenant, self.timeline, compact=compact)
|
||||
if gc:
|
||||
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||
|
||||
@override
|
||||
def compact(self):
|
||||
self.pageserver_http_client.timeline_compact(
|
||||
self.tenant, self.timeline, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@override
|
||||
def report_peak_memory_use(self):
|
||||
self.zenbenchmark.record(
|
||||
"peak_mem",
|
||||
@@ -163,7 +152,6 @@ class NeonCompare(PgCompare):
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
@override
|
||||
def report_size(self):
|
||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||
self.env.repo_dir, self.tenant, self.timeline
|
||||
@@ -197,11 +185,9 @@ class NeonCompare(PgCompare):
|
||||
"num_files_uploaded", total_files, "", report=MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
@override
|
||||
def record_pageserver_writes(self, out_name: str) -> _GeneratorContextManager[None]:
|
||||
return self.zenbenchmark.record_pageserver_writes(self.env.pageserver, out_name)
|
||||
|
||||
@override
|
||||
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
@@ -225,33 +211,26 @@ class VanillaCompare(PgCompare):
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg(self) -> VanillaPostgres:
|
||||
return self._pg
|
||||
|
||||
@property
|
||||
@override
|
||||
def zenbenchmark(self) -> NeonBenchmarker:
|
||||
return self._zenbenchmark
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg_bin(self) -> PgBin:
|
||||
return self._pg.pg_bin
|
||||
|
||||
@override
|
||||
def flush(self, compact: bool = False, gc: bool = False):
|
||||
self.cur.execute("checkpoint")
|
||||
|
||||
@override
|
||||
def compact(self):
|
||||
pass
|
||||
|
||||
@override
|
||||
def report_peak_memory_use(self):
|
||||
pass # TODO find something
|
||||
|
||||
@override
|
||||
def report_size(self):
|
||||
data_size = self.pg.get_subdir_size(Path("base"))
|
||||
self.zenbenchmark.record(
|
||||
@@ -266,7 +245,6 @@ class VanillaCompare(PgCompare):
|
||||
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
|
||||
yield # Do nothing
|
||||
|
||||
@override
|
||||
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
@@ -283,35 +261,28 @@ class RemoteCompare(PgCompare):
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg(self) -> PgProtocol:
|
||||
return self._pg
|
||||
|
||||
@property
|
||||
@override
|
||||
def zenbenchmark(self) -> NeonBenchmarker:
|
||||
return self._zenbenchmark
|
||||
|
||||
@property
|
||||
@override
|
||||
def pg_bin(self) -> PgBin:
|
||||
return self._pg.pg_bin
|
||||
|
||||
@override
|
||||
def flush(self, compact: bool = False, gc: bool = False):
|
||||
def flush(self):
|
||||
# TODO: flush the remote pageserver
|
||||
pass
|
||||
|
||||
@override
|
||||
def compact(self):
|
||||
pass
|
||||
|
||||
@override
|
||||
def report_peak_memory_use(self):
|
||||
# TODO: get memory usage from remote pageserver
|
||||
pass
|
||||
|
||||
@override
|
||||
def report_size(self):
|
||||
# TODO: get storage size from remote pageserver
|
||||
pass
|
||||
@@ -320,7 +291,6 @@ class RemoteCompare(PgCompare):
|
||||
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
|
||||
yield # Do nothing
|
||||
|
||||
@override
|
||||
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
|
||||
@@ -1,31 +1,27 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import concurrent.futures
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.common_types import TenantId
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
|
||||
class ComputeReconfigure:
|
||||
def __init__(self, server: HTTPServer):
|
||||
def __init__(self, server):
|
||||
self.server = server
|
||||
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
|
||||
self.workloads: dict[TenantId, Any] = {}
|
||||
self.on_notify: Optional[Callable[[Any], None]] = None
|
||||
self.workloads = {}
|
||||
self.on_notify = None
|
||||
|
||||
def register_workload(self, workload: Any):
|
||||
def register_workload(self, workload):
|
||||
self.workloads[workload.tenant_id] = workload
|
||||
|
||||
def register_on_notify(self, fn: Optional[Callable[[Any], None]]):
|
||||
def register_on_notify(self, fn):
|
||||
"""
|
||||
Add some extra work during a notification, like sleeping to slow things down, or
|
||||
logging what was notified.
|
||||
@@ -34,7 +30,7 @@ class ComputeReconfigure:
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def compute_reconfigure_listener(make_httpserver: HTTPServer):
|
||||
def compute_reconfigure_listener(make_httpserver):
|
||||
"""
|
||||
This fixture exposes an HTTP listener for the storage controller to submit
|
||||
compute notifications to us, instead of updating neon_local endpoints itself.
|
||||
@@ -52,7 +48,7 @@ def compute_reconfigure_listener(make_httpserver: HTTPServer):
|
||||
# accept a healthy rate of calls into notify-attach.
|
||||
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
def handler(request: Request):
|
||||
assert request.json is not None
|
||||
body: dict[str, Any] = request.json
|
||||
log.info(f"notify-attach request: {body}")
|
||||
|
||||
@@ -14,10 +14,8 @@ from allure_pytest.utils import allure_name, allure_suite_labels
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import MutableMapping
|
||||
from typing import Any
|
||||
|
||||
|
||||
"""
|
||||
The plugin reruns flaky tests.
|
||||
It uses `pytest.mark.flaky` provided by `pytest-rerunfailures` plugin and flaky tests detected by `scripts/flaky_tests.py`
|
||||
|
||||
@@ -1,15 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from pytest_httpserver import HTTPServer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
|
||||
# TODO: mypy fails with:
|
||||
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
|
||||
# from fixtures.neon_fixtures import PortDistributor
|
||||
@@ -24,7 +17,7 @@ def httpserver_ssl_context():
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def make_httpserver(httpserver_listen_address, httpserver_ssl_context) -> Iterator[HTTPServer]:
|
||||
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
|
||||
host, port = httpserver_listen_address
|
||||
if not host:
|
||||
host = HTTPServer.DEFAULT_LISTEN_HOST
|
||||
@@ -40,13 +33,13 @@ def make_httpserver(httpserver_listen_address, httpserver_ssl_context) -> Iterat
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def httpserver(make_httpserver: HTTPServer) -> Iterator[HTTPServer]:
|
||||
def httpserver(make_httpserver):
|
||||
server = make_httpserver
|
||||
yield server
|
||||
server.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor) -> tuple[str, int]:
|
||||
def httpserver_listen_address(port_distributor) -> tuple[str, int]:
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
@@ -31,7 +31,7 @@ LOGGING = {
|
||||
}
|
||||
|
||||
|
||||
def getLogger(name: str = "root") -> logging.Logger:
|
||||
def getLogger(name="root") -> logging.Logger:
|
||||
"""Method to get logger for tests.
|
||||
|
||||
Should be used to get correctly initialized logger."""
|
||||
|
||||
@@ -22,7 +22,7 @@ class Metrics:
|
||||
|
||||
def query_all(self, name: str, filter: Optional[dict[str, str]] = None) -> list[Sample]:
|
||||
filter = filter or {}
|
||||
res: list[Sample] = []
|
||||
res = []
|
||||
|
||||
for sample in self.metrics[name]:
|
||||
try:
|
||||
@@ -59,7 +59,7 @@ class MetricsGetter:
|
||||
return results[0].value
|
||||
|
||||
def get_metrics_values(
|
||||
self, names: list[str], filter: Optional[dict[str, str]] = None, absence_ok: bool = False
|
||||
self, names: list[str], filter: Optional[dict[str, str]] = None, absence_ok=False
|
||||
) -> dict[str, float]:
|
||||
"""
|
||||
When fetching multiple named metrics, it is more efficient to use this
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, cast
|
||||
import requests
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Literal, Optional
|
||||
from typing import Any, Literal, Optional, Union
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
@@ -25,7 +25,9 @@ class NeonAPI:
|
||||
self.__neon_api_key = neon_api_key
|
||||
self.__neon_api_base_url = neon_api_base_url.strip("/")
|
||||
|
||||
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
|
||||
def __request(
|
||||
self, method: Union[str, bytes], endpoint: str, **kwargs: Any
|
||||
) -> requests.Response:
|
||||
if "headers" not in kwargs:
|
||||
kwargs["headers"] = {}
|
||||
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
|
||||
|
||||
@@ -18,6 +18,7 @@ from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from fcntl import LOCK_EX, LOCK_UN, flock
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
@@ -58,7 +59,6 @@ from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
)
|
||||
from fixtures.paths import get_test_repo_dir, shared_snapshot_dir
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
@@ -75,8 +75,8 @@ from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import wait_walreceivers_absent
|
||||
from fixtures.utils import (
|
||||
ATTACHMENT_NAME_REGEX,
|
||||
COMPONENT_BINARIES,
|
||||
allure_add_grafana_links,
|
||||
allure_attach_from_dir,
|
||||
assert_no_errors,
|
||||
get_dir_size,
|
||||
print_gc_result,
|
||||
@@ -96,8 +96,6 @@ if TYPE_CHECKING:
|
||||
Union,
|
||||
)
|
||||
|
||||
from fixtures.paths import SnapshotDirLocked
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@@ -120,11 +118,65 @@ put directly-importable functions into utils.py or another separate file.
|
||||
|
||||
Env = dict[str, str]
|
||||
|
||||
DEFAULT_OUTPUT_DIR: str = "test_output"
|
||||
DEFAULT_BRANCH_NAME: str = "main"
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = Path(__file__).parents[2]
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
@@ -317,14 +369,11 @@ class NeonEnvBuilder:
|
||||
run_id: uuid.UUID,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
compatibility_neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
compatibility_pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
test_name: str,
|
||||
top_output_dir: Path,
|
||||
test_output_dir: Path,
|
||||
combination,
|
||||
test_overlay_dir: Optional[Path] = None,
|
||||
pageserver_remote_storage: Optional[RemoteStorage] = None,
|
||||
# toml that will be decomposed into `--config-override` flags during `pageserver --init`
|
||||
@@ -406,19 +455,6 @@ class NeonEnvBuilder:
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
self.compatibility_neon_binpath = compatibility_neon_binpath
|
||||
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
|
||||
self.version_combination = combination
|
||||
self.mixdir = self.test_output_dir / "mixdir_neon"
|
||||
if self.version_combination is not None:
|
||||
assert (
|
||||
self.compatibility_neon_binpath is not None
|
||||
), "the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
|
||||
assert (
|
||||
self.compatibility_pg_distrib_dir is not None
|
||||
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
|
||||
self.mixdir.mkdir(mode=0o755, exist_ok=True)
|
||||
self._mix_versions()
|
||||
|
||||
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
|
||||
# Cannot create more than one environment from one builder
|
||||
@@ -619,21 +655,6 @@ class NeonEnvBuilder:
|
||||
|
||||
return self.env
|
||||
|
||||
def _mix_versions(self):
|
||||
assert self.version_combination is not None, "version combination must be set"
|
||||
for component, paths in COMPONENT_BINARIES.items():
|
||||
directory = (
|
||||
self.neon_binpath
|
||||
if self.version_combination[component] == "new"
|
||||
else self.compatibility_neon_binpath
|
||||
)
|
||||
for filename in paths:
|
||||
destination = self.mixdir / filename
|
||||
destination.symlink_to(directory / filename)
|
||||
if self.version_combination["compute"] == "old":
|
||||
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
|
||||
self.neon_binpath = self.mixdir
|
||||
|
||||
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
|
||||
"""
|
||||
Mount `srcdir` as an overlayfs mount at `dstdir`.
|
||||
@@ -1382,9 +1403,7 @@ def neon_simple_env(
|
||||
top_output_dir: Path,
|
||||
test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
compatibility_neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
compatibility_pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
@@ -1399,11 +1418,6 @@ def neon_simple_env(
|
||||
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir)
|
||||
combination = (
|
||||
request._pyfuncitem.callspec.params["combination"]
|
||||
if "combination" in request._pyfuncitem.callspec.params
|
||||
else None
|
||||
)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
@@ -1411,9 +1425,7 @@ def neon_simple_env(
|
||||
port_distributor=port_distributor,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
compatibility_neon_binpath=compatibility_neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
@@ -1423,7 +1435,6 @@ def neon_simple_env(
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
|
||||
combination=combination,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
@@ -1437,9 +1448,7 @@ def neon_env_builder(
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
compatibility_neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
compatibility_pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
run_id: uuid.UUID,
|
||||
request: FixtureRequest,
|
||||
@@ -1466,11 +1475,6 @@ def neon_env_builder(
|
||||
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = os.path.join(test_output_dir, "repo")
|
||||
combination = (
|
||||
request._pyfuncitem.callspec.params["combination"]
|
||||
if "combination" in request._pyfuncitem.callspec.params
|
||||
else None
|
||||
)
|
||||
|
||||
# Return the builder to the caller
|
||||
with NeonEnvBuilder(
|
||||
@@ -1479,10 +1483,7 @@ def neon_env_builder(
|
||||
port_distributor=port_distributor,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
compatibility_neon_binpath=compatibility_neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
|
||||
combination=combination,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
@@ -1986,11 +1987,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
log.info(f"reconcile_all waited for {n} shards")
|
||||
return n
|
||||
|
||||
def reconcile_until_idle(self, timeout_secs=30, max_interval=5):
|
||||
def reconcile_until_idle(self, timeout_secs=30):
|
||||
start_at = time.time()
|
||||
n = 1
|
||||
delay_sec = 0.1
|
||||
delay_max = max_interval
|
||||
delay_sec = 0.5
|
||||
delay_max = 5
|
||||
while n > 0:
|
||||
n = self.reconcile_all()
|
||||
if n == 0:
|
||||
@@ -3656,7 +3657,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
config_lines: Optional[list[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple: bool = False,
|
||||
allow_multiple=False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> Endpoint:
|
||||
"""
|
||||
@@ -3997,7 +3998,7 @@ class Safekeeper(LogUtils):
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
|
||||
# list partial uploaded segments of this safekeeper. Works only for
|
||||
# List partial uploaded segments of this safekeeper. Works only for
|
||||
# RemoteStorageKind.LOCAL_FS.
|
||||
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
tline_path = (
|
||||
@@ -4245,6 +4246,44 @@ class StorageScrubber:
|
||||
raise
|
||||
|
||||
|
||||
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
|
||||
"""Compute the path to a working directory for an individual test."""
|
||||
test_name = request.node.name
|
||||
test_dir = top_output_dir / f"{prefix}{test_name.replace('/', '-')}"
|
||||
|
||||
# We rerun flaky tests multiple times, use a separate directory for each run.
|
||||
if (suffix := getattr(request.node, "execution_count", None)) is not None:
|
||||
test_dir = test_dir.parent / f"{test_dir.name}-{suffix}"
|
||||
|
||||
log.info(f"get_test_output_dir is {test_dir}")
|
||||
# make mypy happy
|
||||
assert isinstance(test_dir, Path)
|
||||
return test_dir
|
||||
|
||||
|
||||
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
The working directory for a test.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, "")
|
||||
|
||||
|
||||
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
Directory that contains `upperdir` and `workdir` for overlayfs mounts
|
||||
that a test creates. See `NeonEnvBuilder.overlay_mount`.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, "overlay-")
|
||||
|
||||
|
||||
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
|
||||
return top_output_dir / "shared-snapshots" / snapshot_name
|
||||
|
||||
|
||||
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir) / "repo"
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
parser.addoption(
|
||||
"--preserve-database-files",
|
||||
@@ -4254,11 +4293,154 @@ def pytest_addoption(parser: Parser):
|
||||
)
|
||||
|
||||
|
||||
SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
|
||||
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
)
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def test_output_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, test_overlay_dir: Path
|
||||
) -> Iterator[Path]:
|
||||
"""Create the working directory for an individual test."""
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir)
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
class FileAndThreadLock:
|
||||
def __init__(self, path: Path):
|
||||
self.path = path
|
||||
self.thread_lock = threading.Lock()
|
||||
self.fd: Optional[int] = None
|
||||
|
||||
def __enter__(self):
|
||||
self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
|
||||
# lock thread lock before file lock so that there's no race
|
||||
# around flocking / funlocking the file lock
|
||||
self.thread_lock.acquire()
|
||||
flock(self.fd, LOCK_EX)
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
assert self.fd is not None
|
||||
assert self.thread_lock.locked() # ... by us
|
||||
flock(self.fd, LOCK_UN)
|
||||
self.thread_lock.release()
|
||||
os.close(self.fd)
|
||||
self.fd = None
|
||||
|
||||
|
||||
class SnapshotDirLocked:
|
||||
def __init__(self, parent: SnapshotDir):
|
||||
self._parent = parent
|
||||
|
||||
def is_initialized(self):
|
||||
# TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized.
|
||||
# Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed.
|
||||
return self._parent._marker_file_path.exists()
|
||||
|
||||
def set_initialized(self):
|
||||
self._parent._marker_file_path.write_text("")
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self._parent._path / "snapshot"
|
||||
|
||||
|
||||
class SnapshotDir:
|
||||
_path: Path
|
||||
|
||||
def __init__(self, path: Path):
|
||||
self._path = path
|
||||
assert self._path.is_dir()
|
||||
self._lock = FileAndThreadLock(self._lock_file_path)
|
||||
|
||||
@property
|
||||
def _lock_file_path(self) -> Path:
|
||||
return self._path / "initializing.flock"
|
||||
|
||||
@property
|
||||
def _marker_file_path(self) -> Path:
|
||||
return self._path / "initialized.marker"
|
||||
|
||||
def __enter__(self) -> SnapshotDirLocked:
|
||||
self._lock.__enter__()
|
||||
return SnapshotDirLocked(self)
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self._lock.__exit__(exc_type, exc_value, exc_traceback)
|
||||
|
||||
|
||||
def shared_snapshot_dir(top_output_dir, ident: str) -> SnapshotDir:
|
||||
snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident)
|
||||
snapshot_dir_path.mkdir(exist_ok=True, parents=True)
|
||||
return SnapshotDir(snapshot_dir_path)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
|
||||
|
||||
SKIP_DIRS = frozenset(
|
||||
(
|
||||
"pg_wal",
|
||||
@@ -4280,7 +4462,6 @@ SKIP_FILES = frozenset(
|
||||
"postmaster.opts",
|
||||
"postmaster.pid",
|
||||
"pg_control",
|
||||
"pg_dynshmem",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import psutil
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
|
||||
def iter_mounts_beneath(topdir: Path) -> Iterator[Path]:
|
||||
"""
|
||||
|
||||
@@ -886,7 +886,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
batch_size: Optional[int] = None,
|
||||
batch_size: int | None = None,
|
||||
**kwargs,
|
||||
) -> set[TimelineId]:
|
||||
params = {}
|
||||
|
||||
@@ -9,12 +9,7 @@ import toml
|
||||
from _pytest.python import Metafunc
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
from fixtures.utils import AuxFileStore
|
||||
|
||||
from fixtures.utils import AuxFileStore
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -1,312 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from fcntl import LOCK_EX, LOCK_UN, flock
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from pytest import FixtureRequest
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import allure_attach_from_dir
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
from typing import Optional
|
||||
|
||||
|
||||
DEFAULT_OUTPUT_DIR: str = "test_output"
|
||||
|
||||
|
||||
def get_test_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
"""Compute the path to a working directory for an individual test."""
|
||||
test_name = request.node.name
|
||||
test_dir = top_output_dir / f"{prefix or ''}{test_name.replace('/', '-')}"
|
||||
|
||||
# We rerun flaky tests multiple times, use a separate directory for each run.
|
||||
if (suffix := getattr(request.node, "execution_count", None)) is not None:
|
||||
test_dir = test_dir.parent / f"{test_dir.name}-{suffix}"
|
||||
|
||||
return test_dir
|
||||
|
||||
|
||||
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
The working directory for a test.
|
||||
"""
|
||||
return get_test_dir(request, top_output_dir)
|
||||
|
||||
|
||||
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
Directory that contains `upperdir` and `workdir` for overlayfs mounts
|
||||
that a test creates. See `NeonEnvBuilder.overlay_mount`.
|
||||
"""
|
||||
return get_test_dir(request, top_output_dir, "overlay-")
|
||||
|
||||
|
||||
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
|
||||
return top_output_dir / "shared-snapshots" / snapshot_name
|
||||
|
||||
|
||||
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir) / "repo"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = Path(__file__).parents[2]
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compute_config_dir(base_dir: Path) -> Iterator[Path]:
|
||||
"""
|
||||
Retrieve the path to the compute configuration directory.
|
||||
"""
|
||||
yield base_dir / "compute" / "etc"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath.absolute()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compatibility_snapshot_dir() -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
return
|
||||
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
|
||||
assert (
|
||||
compatibility_snapshot_dir_env is not None
|
||||
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
|
||||
yield compatibility_snapshot_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compatibility_neon_binpath() -> Optional[Iterator[Path]]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
return
|
||||
comp_binpath = None
|
||||
if env_compatibility_neon_binpath := os.environ.get("COMPATIBILITY_NEON_BIN"):
|
||||
comp_binpath = Path(env_compatibility_neon_binpath).resolve().absolute()
|
||||
yield comp_binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compatibility_pg_distrib_dir() -> Optional[Iterator[Path]]:
|
||||
compat_distrib_dir = None
|
||||
if env_compat_postgres_bin := os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR"):
|
||||
compat_distrib_dir = Path(env_compat_postgres_bin).resolve()
|
||||
if not compat_distrib_dir.exists():
|
||||
raise Exception(f"compatibility postgres directory not found at {compat_distrib_dir}")
|
||||
|
||||
if compat_distrib_dir:
|
||||
log.info(f"compatibility_pg_distrib_dir is {compat_distrib_dir}")
|
||||
yield compat_distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def test_output_dir(request: pytest.FixtureRequest, top_output_dir: Path) -> Iterator[Path]:
|
||||
"""Create the working directory for an individual test."""
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir)
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
class FileAndThreadLock:
|
||||
def __init__(self, path: Path):
|
||||
self.path = path
|
||||
self.thread_lock = threading.Lock()
|
||||
self.fd: Optional[int] = None
|
||||
|
||||
def __enter__(self):
|
||||
self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
|
||||
# lock thread lock before file lock so that there's no race
|
||||
# around flocking / funlocking the file lock
|
||||
self.thread_lock.acquire()
|
||||
flock(self.fd, LOCK_EX)
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[type[BaseException]],
|
||||
exc_value: Optional[BaseException],
|
||||
exc_traceback: Optional[TracebackType],
|
||||
):
|
||||
assert self.fd is not None
|
||||
assert self.thread_lock.locked() # ... by us
|
||||
flock(self.fd, LOCK_UN)
|
||||
self.thread_lock.release()
|
||||
os.close(self.fd)
|
||||
self.fd = None
|
||||
|
||||
|
||||
class SnapshotDirLocked:
|
||||
def __init__(self, parent: SnapshotDir):
|
||||
self._parent = parent
|
||||
|
||||
def is_initialized(self):
|
||||
# TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized.
|
||||
# Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed.
|
||||
return self._parent.marker_file_path.exists()
|
||||
|
||||
def set_initialized(self):
|
||||
self._parent.marker_file_path.write_text("")
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self._parent.path / "snapshot"
|
||||
|
||||
|
||||
class SnapshotDir:
|
||||
_path: Path
|
||||
|
||||
def __init__(self, path: Path):
|
||||
self._path = path
|
||||
assert self._path.is_dir()
|
||||
self._lock = FileAndThreadLock(self.lock_file_path)
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self._path
|
||||
|
||||
@property
|
||||
def lock_file_path(self) -> Path:
|
||||
return self._path / "initializing.flock"
|
||||
|
||||
@property
|
||||
def marker_file_path(self) -> Path:
|
||||
return self._path / "initialized.marker"
|
||||
|
||||
def __enter__(self) -> SnapshotDirLocked:
|
||||
self._lock.__enter__()
|
||||
return SnapshotDirLocked(self)
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[type[BaseException]],
|
||||
exc_value: Optional[BaseException],
|
||||
exc_traceback: Optional[TracebackType],
|
||||
):
|
||||
self._lock.__exit__(exc_type, exc_value, exc_traceback)
|
||||
|
||||
|
||||
def shared_snapshot_dir(top_output_dir: Path, ident: str) -> SnapshotDir:
|
||||
snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident)
|
||||
snapshot_dir_path.mkdir(exist_ok=True, parents=True)
|
||||
return SnapshotDir(snapshot_dir_path)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
@@ -2,14 +2,9 @@ from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
"""
|
||||
This fixture is used to determine which version of Postgres to use for tests.
|
||||
@@ -29,12 +24,10 @@ class PgVersion(str, enum.Enum):
|
||||
NOT_SET = "<-POSTRGRES VERSION IS NOT SET->"
|
||||
|
||||
# Make it less confusing in logs
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f"'{self.value}'"
|
||||
|
||||
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
@@ -45,8 +38,7 @@ class PgVersion(str, enum.Enum):
|
||||
return f"v{self.value}"
|
||||
|
||||
@classmethod
|
||||
@override
|
||||
def _missing_(cls, value: object) -> Optional[PgVersion]:
|
||||
def _missing_(cls, value) -> Optional[PgVersion]:
|
||||
known_values = {v.value for _, v in cls.__members__.items()}
|
||||
|
||||
# Allow passing version as a string with "v" prefix (e.g. "v14")
|
||||
|
||||
@@ -59,7 +59,10 @@ class PortDistributor:
|
||||
if isinstance(value, int):
|
||||
return self._replace_port_int(value)
|
||||
|
||||
return self._replace_port_str(value)
|
||||
if isinstance(value, str):
|
||||
return self._replace_port_str(value)
|
||||
|
||||
raise TypeError(f"unsupported type {type(value)} of {value=}")
|
||||
|
||||
def _replace_port_int(self, value: int) -> int:
|
||||
known_port = self.port_map.get(value)
|
||||
@@ -72,7 +75,7 @@ class PortDistributor:
|
||||
# Use regex to find port in a string
|
||||
# urllib.parse.urlparse produces inconvenient results for cases without scheme like "localhost:5432"
|
||||
# See https://bugs.python.org/issue27657
|
||||
ports: list[str] = re.findall(r":(\d+)(?:/|$)", value)
|
||||
ports = re.findall(r":(\d+)(?:/|$)", value)
|
||||
assert len(ports) == 1, f"can't find port in {value}"
|
||||
port_int = int(ports[0])
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ import boto3
|
||||
import toml
|
||||
from moto.server import ThreadedMotoServer
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
@@ -37,7 +36,6 @@ class RemoteStorageUser(str, enum.Enum):
|
||||
EXTENSIONS = "ext"
|
||||
SAFEKEEPER = "safekeeper"
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
@@ -83,13 +81,11 @@ class LocalFsStorage:
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def timeline_latest_generation(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> Optional[int]:
|
||||
def timeline_latest_generation(self, tenant_id, timeline_id):
|
||||
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
|
||||
index_parts = [f for f in timeline_files if f.startswith("index_part")]
|
||||
|
||||
def parse_gen(filename: str) -> Optional[int]:
|
||||
def parse_gen(filename):
|
||||
log.info(f"parsing index_part '{filename}'")
|
||||
parts = filename.split("-")
|
||||
if len(parts) == 2:
|
||||
@@ -97,7 +93,7 @@ class LocalFsStorage:
|
||||
else:
|
||||
return None
|
||||
|
||||
generations = sorted([parse_gen(f) for f in index_parts]) # type: ignore
|
||||
generations = sorted([parse_gen(f) for f in index_parts])
|
||||
if len(generations) == 0:
|
||||
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
|
||||
return generations[-1]
|
||||
@@ -126,14 +122,14 @@ class LocalFsStorage:
|
||||
filename = f"{local_name}-{generation:08x}"
|
||||
return self.timeline_path(tenant_id, timeline_id) / filename
|
||||
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
def heatmap_path(self, tenant_id: TenantId) -> Path:
|
||||
return self.tenant_path(tenant_id) / TENANT_HEATMAP_FILE_NAME
|
||||
|
||||
def heatmap_content(self, tenant_id: TenantId) -> Any:
|
||||
def heatmap_content(self, tenant_id):
|
||||
with self.heatmap_path(tenant_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
@@ -301,7 +297,7 @@ class S3Storage:
|
||||
def heatmap_key(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
|
||||
|
||||
def heatmap_content(self, tenant_id: TenantId) -> Any:
|
||||
def heatmap_content(self, tenant_id: TenantId):
|
||||
r = self.client.get_object(Bucket=self.bucket_name, Key=self.heatmap_key(tenant_id))
|
||||
return json.loads(r["Body"].read().decode("utf-8"))
|
||||
|
||||
@@ -321,7 +317,7 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
def configure(
|
||||
self,
|
||||
repo_dir: Path,
|
||||
mock_s3_server: MockS3Server,
|
||||
mock_s3_server,
|
||||
run_id: str,
|
||||
test_name: str,
|
||||
user: RemoteStorageUser,
|
||||
@@ -455,9 +451,15 @@ def default_remote_storage() -> RemoteStorageKind:
|
||||
|
||||
|
||||
def remote_storage_to_toml_dict(remote_storage: RemoteStorage) -> dict[str, Any]:
|
||||
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
|
||||
raise Exception("invalid remote storage type")
|
||||
|
||||
return remote_storage.to_toml_dict()
|
||||
|
||||
|
||||
# serialize as toml inline table
|
||||
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
|
||||
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
|
||||
raise Exception("invalid remote storage type")
|
||||
|
||||
return remote_storage.to_toml_inline_table()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -12,9 +12,6 @@ from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
class StorageControllerProxy:
|
||||
def __init__(self, server: HTTPServer):
|
||||
@@ -37,7 +34,7 @@ def proxy_request(method: str, url: str, **kwargs) -> requests.Response:
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def storage_controller_proxy(make_httpserver: HTTPServer):
|
||||
def storage_controller_proxy(make_httpserver):
|
||||
"""
|
||||
Proxies requests into the storage controller to the currently
|
||||
selected storage controller instance via `StorageControllerProxy.route_to`.
|
||||
@@ -51,7 +48,7 @@ def storage_controller_proxy(make_httpserver: HTTPServer):
|
||||
|
||||
log.info(f"Storage controller proxy listening on {self.listen}")
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
def handler(request: Request):
|
||||
if self.route_to is None:
|
||||
log.info(f"Storage controller proxy has no routing configured for {request.url}")
|
||||
return Response("Routing not configured", status=503)
|
||||
|
||||
@@ -18,7 +18,6 @@ from urllib.parse import urlencode
|
||||
import allure
|
||||
import zstandard
|
||||
from psycopg2.extensions import cursor
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.common_types import (
|
||||
@@ -27,45 +26,28 @@ from fixtures.pageserver.common_types import (
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from typing import IO, Optional
|
||||
from typing import (
|
||||
IO,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
from fixtures.common_types import TimelineId
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
|
||||
WaitUntilRet = TypeVar("WaitUntilRet")
|
||||
|
||||
from fixtures.common_types import TimelineId
|
||||
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
COMPONENT_BINARIES = {
|
||||
"storage_controller": ("storage_controller",),
|
||||
"storage_broker": ("storage_broker",),
|
||||
"compute": ("compute_ctl",),
|
||||
"safekeeper": ("safekeeper",),
|
||||
"pageserver": ("pageserver", "pagectl"),
|
||||
}
|
||||
# Disable auto-formatting for better readability
|
||||
# fmt: off
|
||||
VERSIONS_COMBINATIONS = (
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
def subprocess_capture(
|
||||
capture_dir: Path,
|
||||
cmd: list[str],
|
||||
*,
|
||||
check: bool = False,
|
||||
echo_stderr: bool = False,
|
||||
echo_stdout: bool = False,
|
||||
capture_stdout: bool = False,
|
||||
timeout: Optional[float] = None,
|
||||
with_command_header: bool = True,
|
||||
check=False,
|
||||
echo_stderr=False,
|
||||
echo_stdout=False,
|
||||
capture_stdout=False,
|
||||
timeout=None,
|
||||
with_command_header=True,
|
||||
**popen_kwargs: Any,
|
||||
) -> tuple[str, Optional[str], int]:
|
||||
"""Run a process and bifurcate its output to files and the `log` logger
|
||||
@@ -102,7 +84,6 @@ def subprocess_capture(
|
||||
self.capture = capture
|
||||
self.captured = ""
|
||||
|
||||
@override
|
||||
def run(self):
|
||||
first = with_command_header
|
||||
for line in self.in_file:
|
||||
@@ -184,10 +165,10 @@ def global_counter() -> int:
|
||||
def print_gc_result(row: dict[str, Any]):
|
||||
log.info("GC duration {elapsed} ms".format_map(row))
|
||||
log.info(
|
||||
(
|
||||
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
|
||||
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
|
||||
).format_map(row)
|
||||
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
|
||||
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}".format_map(
|
||||
row
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -245,7 +226,7 @@ def get_scale_for_db(size_mb: int) -> int:
|
||||
return round(0.06689 * size_mb - 0.5)
|
||||
|
||||
|
||||
ATTACHMENT_NAME_REGEX: re.Pattern[str] = re.compile(
|
||||
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"regression\.(diffs|out)|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)"
|
||||
)
|
||||
|
||||
@@ -308,7 +289,7 @@ LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
|
||||
|
||||
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
|
||||
"""Add links to server logs in Grafana to Allure report"""
|
||||
links: dict[str, str] = {}
|
||||
links = {}
|
||||
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
|
||||
endpoint_id, region_id, _ = host.split(".", 2)
|
||||
|
||||
@@ -360,7 +341,7 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
|
||||
|
||||
|
||||
def start_in_background(
|
||||
command: list[str], cwd: Path, log_file_name: str, is_started: Callable[[], WaitUntilRet]
|
||||
command: list[str], cwd: Path, log_file_name: str, is_started: Fn
|
||||
) -> subprocess.Popen[bytes]:
|
||||
"""Starts a process, creates the logfile and redirects stderr and stdout there. Runs the start checks before the process is started, or errors."""
|
||||
|
||||
@@ -395,11 +376,14 @@ def start_in_background(
|
||||
return spawned_process
|
||||
|
||||
|
||||
WaitUntilRet = TypeVar("WaitUntilRet")
|
||||
|
||||
|
||||
def wait_until(
|
||||
number_of_iterations: int,
|
||||
interval: float,
|
||||
func: Callable[[], WaitUntilRet],
|
||||
show_intermediate_error: bool = False,
|
||||
show_intermediate_error=False,
|
||||
) -> WaitUntilRet:
|
||||
"""
|
||||
Wait until 'func' returns successfully, without exception. Returns the
|
||||
@@ -480,7 +464,7 @@ def humantime_to_ms(humantime: str) -> float:
|
||||
def scan_log_for_errors(input: Iterable[str], allowed_errors: list[str]) -> list[tuple[int, str]]:
|
||||
# FIXME: this duplicates test_runner/fixtures/pageserver/allowed_errors.py
|
||||
error_or_warn = re.compile(r"\s(ERROR|WARN)")
|
||||
errors: list[tuple[int, str]] = []
|
||||
errors = []
|
||||
for lineno, line in enumerate(input, start=1):
|
||||
if len(line) == 0:
|
||||
continue
|
||||
@@ -500,7 +484,7 @@ def scan_log_for_errors(input: Iterable[str], allowed_errors: list[str]) -> list
|
||||
return errors
|
||||
|
||||
|
||||
def assert_no_errors(log_file: Path, service: str, allowed_errors: list[str]):
|
||||
def assert_no_errors(log_file, service, allowed_errors):
|
||||
if not log_file.exists():
|
||||
log.warning(f"Skipping {service} log check: {log_file} does not exist")
|
||||
return
|
||||
@@ -520,11 +504,9 @@ class AuxFileStore(str, enum.Enum):
|
||||
V2 = "v2"
|
||||
CrossValidation = "cross-validation"
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
return f"'aux-{self.value}'"
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return f"'aux-{self.value}'"
|
||||
|
||||
@@ -543,7 +525,7 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
|
||||
"""
|
||||
started_at = time.time()
|
||||
|
||||
def hash_extracted(reader: Optional[IO[bytes]]) -> bytes:
|
||||
def hash_extracted(reader: Union[IO[bytes], None]) -> bytes:
|
||||
assert reader is not None
|
||||
digest = sha256(usedforsecurity=False)
|
||||
while True:
|
||||
@@ -568,7 +550,7 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
|
||||
right_list
|
||||
), f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
|
||||
|
||||
mismatching: set[str] = set()
|
||||
mismatching = set()
|
||||
|
||||
for left_tuple, right_tuple in zip(left_list, right_list):
|
||||
left_path, left_hash = left_tuple
|
||||
@@ -593,7 +575,6 @@ class PropagatingThread(threading.Thread):
|
||||
Simple Thread wrapper with join() propagating the possible exception in the thread.
|
||||
"""
|
||||
|
||||
@override
|
||||
def run(self):
|
||||
self.exc = None
|
||||
try:
|
||||
@@ -601,8 +582,7 @@ class PropagatingThread(threading.Thread):
|
||||
except BaseException as e:
|
||||
self.exc = e
|
||||
|
||||
@override
|
||||
def join(self, timeout: Optional[float] = None) -> Any:
|
||||
def join(self, timeout=None):
|
||||
super().join(timeout)
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
@@ -624,19 +604,3 @@ def human_bytes(amt: float) -> str:
|
||||
amt = amt / 1024
|
||||
|
||||
raise RuntimeError("unreachable")
|
||||
|
||||
|
||||
def allpairs_versions():
|
||||
"""
|
||||
Returns a dictionary with arguments for pytest parametrize
|
||||
to test the compatibility with the previous version of Neon components
|
||||
combinations were pre-computed to test all the pairs of the components with
|
||||
the different versions.
|
||||
"""
|
||||
ids = []
|
||||
for pair in VERSIONS_COMBINATIONS:
|
||||
cur_id = []
|
||||
for component in sorted(pair.keys()):
|
||||
cur_id.append(pair[component][0])
|
||||
ids.append(f"combination_{''.join(cur_id)}")
|
||||
return {"argnames": "combination", "argvalues": VERSIONS_COMBINATIONS, "ids": ids}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Any, Optional
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
@@ -14,9 +14,6 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
@@ -103,7 +100,7 @@ class Workload:
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
|
||||
def write_rows(self, n: int, pageserver_id: Optional[int] = None, upload: bool = True):
|
||||
def write_rows(self, n, pageserver_id: Optional[int] = None, upload: bool = True):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
start = self.expect_rows
|
||||
end = start + n - 1
|
||||
@@ -124,9 +121,7 @@ class Workload:
|
||||
else:
|
||||
return False
|
||||
|
||||
def churn_rows(
|
||||
self, n: int, pageserver_id: Optional[int] = None, upload: bool = True, ingest: bool = True
|
||||
):
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
|
||||
@@ -4,10 +4,9 @@ import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -35,7 +34,6 @@ def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[
|
||||
if tenant_placement[tid]["intent"]["attached"]
|
||||
== tenant_placement[tid]["observed"]["attached"]
|
||||
}
|
||||
|
||||
assert len(matching) == total_shards
|
||||
|
||||
attached_per_node: defaultdict[str, int] = defaultdict(int)
|
||||
@@ -109,48 +107,15 @@ def test_storage_controller_many_tenants(
|
||||
ps.allowed_errors.append(".*request was dropped before completing.*")
|
||||
|
||||
# Total tenants
|
||||
small_tenant_count = 7800
|
||||
large_tenant_count = 200
|
||||
tenant_count = small_tenant_count + large_tenant_count
|
||||
large_tenant_shard_count = 8
|
||||
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
|
||||
tenant_count = 4000
|
||||
|
||||
# A small stripe size to encourage all shards to get some data
|
||||
stripe_size = 1
|
||||
# Shards per tenant
|
||||
shard_count = 2
|
||||
stripe_size = 1024
|
||||
|
||||
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
|
||||
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
|
||||
rng = random.Random(1234)
|
||||
total_shards = tenant_count * shard_count
|
||||
|
||||
class Tenant:
|
||||
def __init__(self):
|
||||
# Tenants may optionally contain a timeline
|
||||
self.timeline_id = None
|
||||
|
||||
# Tenants may be marked as 'large' to get multiple shard during creation phase
|
||||
self.large = False
|
||||
|
||||
tenant_ids = list(TenantId.generate() for _i in range(0, tenant_count))
|
||||
tenants = dict((tid, Tenant()) for tid in tenant_ids)
|
||||
|
||||
# We will create timelines in only a subset of tenants, because creating timelines
|
||||
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
|
||||
# a single test node.
|
||||
tenant_timelines_count = 100
|
||||
|
||||
# These lists are maintained for use with rng.choice
|
||||
tenants_with_timelines = list(rng.sample(tenants.keys(), tenant_timelines_count))
|
||||
tenants_without_timelines = list(
|
||||
tenant_id for tenant_id in tenants if tenant_id not in tenants_with_timelines
|
||||
)
|
||||
|
||||
# For our sharded tenants, we will make half of them with timelines and half without
|
||||
assert large_tenant_count >= tenant_timelines_count / 2
|
||||
for tenant_id in tenants_with_timelines[0 : large_tenant_count // 2]:
|
||||
tenants[tenant_id].large = True
|
||||
|
||||
for tenant_id in tenants_without_timelines[0 : large_tenant_count // 2]:
|
||||
tenants[tenant_id].large = True
|
||||
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
@@ -160,39 +125,23 @@ def test_storage_controller_many_tenants(
|
||||
|
||||
rss = env.storage_controller.get_metric_value("process_resident_memory_bytes")
|
||||
assert rss is not None
|
||||
log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)")
|
||||
assert rss < expect_memory_per_shard * total_shards
|
||||
log.info(f"Resident memory: {rss} ({ rss / (shard_count * tenant_count)} per shard)")
|
||||
assert rss < expect_memory_per_shard * shard_count * tenant_count
|
||||
|
||||
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
|
||||
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
|
||||
rng = random.Random(1234)
|
||||
|
||||
# Issue more concurrent operations than the storage controller's reconciler concurrency semaphore
|
||||
# permits, to ensure that we are exercising stressing that.
|
||||
api_concurrency = 135
|
||||
|
||||
# A different concurrency limit for bulk tenant+timeline creations: these do I/O and will
|
||||
# start timing on test nodes if we aren't a bit careful.
|
||||
create_concurrency = 16
|
||||
|
||||
class Operation(str, Enum):
|
||||
TIMELINE_OPS = "timeline_ops"
|
||||
SHARD_MIGRATE = "shard_migrate"
|
||||
TENANT_PASSTHROUGH = "tenant_passthrough"
|
||||
|
||||
run_ops = api_concurrency * 4
|
||||
assert run_ops < len(tenants)
|
||||
|
||||
# Creation phase: make a lot of tenants, and create timelines in a subset of them
|
||||
# This executor has concurrency set modestly, to avoid overloading pageservers with timeline creations.
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=create_concurrency) as executor:
|
||||
tenant_create_futs = []
|
||||
# We will create tenants directly via API, not via neon_local, to avoid any false
|
||||
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
|
||||
futs = []
|
||||
t1 = time.time()
|
||||
|
||||
for tenant_id, tenant in tenants.items():
|
||||
if tenant.large:
|
||||
shard_count = large_tenant_shard_count
|
||||
else:
|
||||
shard_count = 1
|
||||
|
||||
# We will create tenants directly via API, not via neon_local, to avoid any false
|
||||
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
|
||||
for tenant_id in tenants:
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_create,
|
||||
tenant_id,
|
||||
@@ -203,106 +152,44 @@ def test_storage_controller_many_tenants(
|
||||
tenant_config={"heatmap_period": "10s"},
|
||||
placement_policy={"Attached": 1},
|
||||
)
|
||||
tenant_create_futs.append(f)
|
||||
futs.append(f)
|
||||
|
||||
# Wait for tenant creations to finish
|
||||
for f in tenant_create_futs:
|
||||
# Wait for creations to finish
|
||||
for f in futs:
|
||||
f.result()
|
||||
log.info(
|
||||
f"Created {len(tenants)} tenants in {time.time() - t1}, {len(tenants) / (time.time() - t1)}/s"
|
||||
)
|
||||
|
||||
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
|
||||
# would be for original scheduling decisions to always match optimizer's preference)
|
||||
# (workaround for https://github.com/neondatabase/neon/issues/8969)
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
|
||||
|
||||
# Create timelines in those tenants which are going to get one
|
||||
t1 = time.time()
|
||||
timeline_create_futs = []
|
||||
for tenant_id in tenants_with_timelines:
|
||||
timeline_id = TimelineId.generate()
|
||||
tenants[tenant_id].timeline_id = timeline_id
|
||||
f = executor.submit(
|
||||
env.storage_controller.pageserver_api().timeline_create,
|
||||
PgVersion.NOT_SET,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
)
|
||||
timeline_create_futs.append(f)
|
||||
|
||||
for f in timeline_create_futs:
|
||||
f.result()
|
||||
log.info(
|
||||
f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s"
|
||||
)
|
||||
|
||||
# Plan operations: ensure each tenant with a timeline gets at least
|
||||
# one of each operation type. Then add other tenants to make up the
|
||||
# numbers.
|
||||
ops_plan = []
|
||||
for tenant_id in tenants_with_timelines:
|
||||
ops_plan.append((tenant_id, Operation.TIMELINE_OPS))
|
||||
ops_plan.append((tenant_id, Operation.SHARD_MIGRATE))
|
||||
ops_plan.append((tenant_id, Operation.TENANT_PASSTHROUGH))
|
||||
|
||||
# Fill up remaining run_ops with migrations of tenants without timelines
|
||||
other_migrate_tenants = rng.sample(tenants_without_timelines, run_ops - len(ops_plan))
|
||||
|
||||
for tenant_id in other_migrate_tenants:
|
||||
ops_plan.append(
|
||||
(
|
||||
tenant_id,
|
||||
rng.choice([Operation.SHARD_MIGRATE, Operation.TENANT_PASSTHROUGH]),
|
||||
)
|
||||
)
|
||||
|
||||
# Exercise phase: pick pseudo-random operations to do on the tenants + timelines
|
||||
# This executor has concurrency high enough to stress the storage controller API.
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
|
||||
|
||||
def exercise_timeline_ops(tenant_id, timeline_id):
|
||||
# A read operation: this requires looking up shard zero and routing there
|
||||
detail = virtual_ps_http.timeline_detail(tenant_id, timeline_id)
|
||||
assert detail["timeline_id"] == str(timeline_id)
|
||||
|
||||
# A fan-out write operation to all shards in a tenant.
|
||||
# - We use a metadata operation rather than something like a timeline create, because
|
||||
# timeline creations are I/O intensive and this test isn't meant to be a stress test for
|
||||
# doing lots of concurrent timeline creations.
|
||||
archival_state = rng.choice(
|
||||
[TimelineArchivalState.ARCHIVED, TimelineArchivalState.UNARCHIVED]
|
||||
)
|
||||
virtual_ps_http.timeline_archival_config(tenant_id, timeline_id, archival_state)
|
||||
run_ops = api_concurrency * 4
|
||||
assert run_ops < len(tenants)
|
||||
op_tenants = list(tenants)[0:run_ops]
|
||||
|
||||
# Generate a mixture of operations and dispatch them all concurrently
|
||||
futs = []
|
||||
for tenant_id, op in ops_plan:
|
||||
if op == Operation.TIMELINE_OPS:
|
||||
op_timeline_id = tenants[tenant_id].timeline_id
|
||||
assert op_timeline_id is not None
|
||||
|
||||
# Exercise operations that modify tenant scheduling state but require traversing
|
||||
# the fan-out-to-all-shards functionality.
|
||||
for tenant_id in op_tenants:
|
||||
op = rng.choice([0, 1, 2])
|
||||
if op == 0:
|
||||
# A fan-out write operation to all shards in a tenant (timeline creation)
|
||||
f = executor.submit(
|
||||
exercise_timeline_ops,
|
||||
virtual_ps_http.timeline_create,
|
||||
PgVersion.NOT_SET,
|
||||
tenant_id,
|
||||
op_timeline_id,
|
||||
TimelineId.generate(),
|
||||
)
|
||||
elif op == Operation.SHARD_MIGRATE:
|
||||
elif op == 1:
|
||||
# A reconciler operation: migrate a shard.
|
||||
desc = env.storage_controller.tenant_describe(tenant_id)
|
||||
|
||||
shard_number = rng.randint(0, len(desc["shards"]) - 1)
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, len(desc["shards"]))
|
||||
shard_number = rng.randint(0, shard_count - 1)
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
|
||||
# Migrate it to its secondary location
|
||||
desc = env.storage_controller.tenant_describe(tenant_id)
|
||||
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
|
||||
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
|
||||
)
|
||||
elif op == Operation.TENANT_PASSTHROUGH:
|
||||
elif op == 2:
|
||||
# A passthrough read to shard zero
|
||||
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
|
||||
|
||||
@@ -312,18 +199,10 @@ def test_storage_controller_many_tenants(
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
log.info("Completed mixed operations phase")
|
||||
|
||||
# Some of the operations above (notably migrations) might leave the controller in a state where it has
|
||||
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
|
||||
# to reach a quiescent state before doing following checks.
|
||||
#
|
||||
# - Set max_interval low because we probably have a significant number of optimizations to complete and would like
|
||||
# the test to run quickly.
|
||||
# - Set timeout high because we might be waiting for optimizations that reuqire a secondary
|
||||
# to warm up, and if we just started a secondary in the previous step, it might wait some time
|
||||
# before downloading its heatmap
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
check_memory()
|
||||
@@ -334,7 +213,6 @@ def test_storage_controller_many_tenants(
|
||||
#
|
||||
# We do not require that the system is quiescent already here, although at present in this point in the test
|
||||
# that may be the case.
|
||||
log.info("Reconciling all & timing")
|
||||
while True:
|
||||
t1 = time.time()
|
||||
reconcilers = env.storage_controller.reconcile_all()
|
||||
@@ -347,7 +225,6 @@ def test_storage_controller_many_tenants(
|
||||
break
|
||||
|
||||
# Restart the storage controller
|
||||
log.info("Restarting controller")
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
@@ -369,16 +246,7 @@ def test_storage_controller_many_tenants(
|
||||
|
||||
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
|
||||
# and the storage controller drain and fill API
|
||||
log.info("Restarting pageservers...")
|
||||
|
||||
# Parameters for how long we expect it to take to migrate all of the tenants from/to
|
||||
# a node during a drain/fill operation
|
||||
DRAIN_FILL_TIMEOUT = 240
|
||||
DRAIN_FILL_BACKOFF = 5
|
||||
|
||||
for ps in env.pageservers:
|
||||
log.info(f"Draining pageserver {ps.id}")
|
||||
t1 = time.time()
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
@@ -387,10 +255,9 @@ def test_storage_controller_many_tenants(
|
||||
ps.id,
|
||||
PageserverAvailability.ACTIVE,
|
||||
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
|
||||
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
|
||||
backoff=DRAIN_FILL_BACKOFF,
|
||||
max_attempts=24,
|
||||
backoff=5,
|
||||
)
|
||||
log.info(f"Drained pageserver {ps.id} in {time.time() - t1}s")
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
@@ -408,7 +275,6 @@ def test_storage_controller_many_tenants(
|
||||
backoff=1,
|
||||
)
|
||||
|
||||
log.info(f"Filling pageserver {ps.id}")
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
@@ -416,23 +282,16 @@ def test_storage_controller_many_tenants(
|
||||
ps.id,
|
||||
PageserverAvailability.ACTIVE,
|
||||
PageserverSchedulingPolicy.ACTIVE,
|
||||
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
|
||||
backoff=DRAIN_FILL_BACKOFF,
|
||||
max_attempts=24,
|
||||
backoff=5,
|
||||
)
|
||||
|
||||
log.info(f"Filled pageserver {ps.id} in {time.time() - t1}s")
|
||||
|
||||
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
|
||||
# would be for original scheduling decisions to always match optimizer's preference)
|
||||
# (workaround for https://github.com/neondatabase/neon/issues/8969)
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
|
||||
|
||||
@@ -4,7 +4,7 @@ import enum
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -16,10 +16,6 @@ from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
# Disable gc and compaction. The test runs compaction manually.
|
||||
"gc_period": "0s",
|
||||
|
||||
@@ -9,7 +9,6 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import fixtures.utils
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
@@ -94,34 +93,6 @@ if TYPE_CHECKING:
|
||||
# # Run forward compatibility test
|
||||
# ./scripts/pytest -k test_forward_compatibility
|
||||
#
|
||||
#
|
||||
# How to run `test_version_mismatch` locally:
|
||||
#
|
||||
# export DEFAULT_PG_VERSION=16
|
||||
# export BUILD_TYPE=release
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
|
||||
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
|
||||
# export NEON_BIN=target/release
|
||||
# export POSTGRES_DISTRIB_DIR=pg_install
|
||||
#
|
||||
# # Build previous version of binaries and store them somewhere:
|
||||
# rm -rf pg_install target
|
||||
# git checkout <previous version>
|
||||
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
|
||||
# mkdir -p neon_previous/target
|
||||
# cp -a target/${BUILD_TYPE} ./neon_previous/target/${BUILD_TYPE}
|
||||
# cp -a pg_install ./neon_previous/pg_install
|
||||
#
|
||||
# # Build current version of binaries and create a data snapshot:
|
||||
# rm -rf pg_install target
|
||||
# git checkout <current version>
|
||||
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
|
||||
# ./scripts/pytest -k test_create_snapshot
|
||||
#
|
||||
# # Run the version mismatch test
|
||||
# ./scripts/pytest -k test_version_mismatch
|
||||
|
||||
|
||||
check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
|
||||
os.environ.get("CHECK_ONDISK_DATA_COMPATIBILITY") is None,
|
||||
@@ -195,11 +166,16 @@ def test_backward_compatibility(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir: Path,
|
||||
):
|
||||
"""
|
||||
Test that the new binaries can read old data
|
||||
"""
|
||||
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
|
||||
assert (
|
||||
compatibility_snapshot_dir_env is not None
|
||||
), f"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg{pg_version.v_prefixed}` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
|
||||
|
||||
breaking_changes_allowed = (
|
||||
os.environ.get("ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
|
||||
)
|
||||
@@ -238,11 +214,27 @@ def test_forward_compatibility(
|
||||
test_output_dir: Path,
|
||||
top_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir: Path,
|
||||
):
|
||||
"""
|
||||
Test that the old binaries can read new data
|
||||
"""
|
||||
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
|
||||
assert compatibility_neon_bin_env is not None, (
|
||||
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
|
||||
"(ideally generated by the previous version of Neon)"
|
||||
)
|
||||
compatibility_neon_bin = Path(compatibility_neon_bin_env).resolve()
|
||||
|
||||
compatibility_postgres_distrib_dir_env = os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR")
|
||||
assert (
|
||||
compatibility_postgres_distrib_dir_env is not None
|
||||
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
|
||||
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
|
||||
|
||||
compatibility_snapshot_dir = (
|
||||
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
|
||||
)
|
||||
|
||||
breaking_changes_allowed = (
|
||||
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
|
||||
)
|
||||
@@ -253,14 +245,9 @@ def test_forward_compatibility(
|
||||
# Use previous version's production binaries (pageserver, safekeeper, pg_distrib_dir, etc.).
|
||||
# But always use the current version's neon_local binary.
|
||||
# This is because we want to test the compatibility of the data format, not the compatibility of the neon_local CLI.
|
||||
assert (
|
||||
neon_env_builder.compatibility_neon_binpath is not None
|
||||
), "the environment variable COMPATIBILITY_NEON_BIN is required"
|
||||
assert (
|
||||
neon_env_builder.compatibility_pg_distrib_dir is not None
|
||||
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required"
|
||||
neon_env_builder.neon_binpath = neon_env_builder.compatibility_neon_binpath
|
||||
neon_env_builder.pg_distrib_dir = neon_env_builder.compatibility_pg_distrib_dir
|
||||
neon_env_builder.neon_binpath = compatibility_neon_bin
|
||||
neon_env_builder.pg_distrib_dir = compatibility_postgres_distrib_dir
|
||||
neon_env_builder.neon_local_binpath = neon_env_builder.neon_local_binpath
|
||||
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
@@ -571,29 +558,3 @@ def test_historic_storage_formats(
|
||||
env.pageserver.http_client().timeline_compact(
|
||||
dataset.tenant_id, existing_timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_versions_mismatch(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Checks compatibility of different combinations of versions of the components
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[".*ingesting record with timestamp lagging more than wait_lsn_timeout.+"]
|
||||
)
|
||||
env.start()
|
||||
check_neon_works(
|
||||
env, test_output_dir, compatibility_snapshot_dir / "dump.sql", test_output_dir / "repo"
|
||||
)
|
||||
|
||||
@@ -162,11 +162,6 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID)
|
||||
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1)
|
||||
|
||||
# We will stop the storage controller while it may have requests in
|
||||
# flight, and the pageserver complains when requests are abandoned.
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.append(".*request was dropped before completing.*")
|
||||
|
||||
# Keep NeonEnv state up to date, it usually owns starting/stopping services
|
||||
env.pageservers[0].running = False
|
||||
env.pageservers[1].running = False
|
||||
|
||||
@@ -15,7 +15,7 @@ import enum
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
@@ -40,10 +40,6 @@ from fixtures.remote_storage import (
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# A tenant configuration that is convenient for generating uploads and deletions
|
||||
# without a large amount of postgres traffic.
|
||||
TENANT_CONF = {
|
||||
|
||||
@@ -23,7 +23,6 @@ from fixtures.remote_storage import s3_storage
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
from typing_extensions import override
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
@@ -955,7 +954,6 @@ class PageserverFailpoint(Failure):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
@override
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.allowed_errors.extend(
|
||||
@@ -963,23 +961,19 @@ class PageserverFailpoint(Failure):
|
||||
)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
@override
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
if self._mitigate:
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||
|
||||
@override
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
@override
|
||||
def can_mitigate(self):
|
||||
return self._mitigate
|
||||
|
||||
@override
|
||||
def mitigate(self, env: NeonEnv):
|
||||
def mitigate(self, env):
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
@@ -989,11 +983,9 @@ class StorageControllerFailpoint(Failure):
|
||||
self.pageserver_id = None
|
||||
self.action = action
|
||||
|
||||
@override
|
||||
def apply(self, env: NeonEnv):
|
||||
env.storage_controller.configure_failpoints((self.failpoint, self.action))
|
||||
|
||||
@override
|
||||
def clear(self, env: NeonEnv):
|
||||
if "panic" in self.action:
|
||||
log.info("Restarting storage controller after panic")
|
||||
@@ -1002,19 +994,16 @@ class StorageControllerFailpoint(Failure):
|
||||
else:
|
||||
env.storage_controller.configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
@override
|
||||
def expect_available(self):
|
||||
# Controller panics _do_ leave pageservers available, but our test code relies
|
||||
# on using the locate API to update configurations in Workload, so we must skip
|
||||
# these actions when the controller has been panicked.
|
||||
return "panic" not in self.action
|
||||
|
||||
@override
|
||||
def can_mitigate(self):
|
||||
return False
|
||||
|
||||
@override
|
||||
def fails_forward(self, env: NeonEnv):
|
||||
def fails_forward(self, env):
|
||||
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||
# the abort path will fail-forward and result in a complete split.
|
||||
fail_forward = self.failpoint == "shard-split-post-complete"
|
||||
@@ -1028,7 +1017,6 @@ class StorageControllerFailpoint(Failure):
|
||||
|
||||
return fail_forward
|
||||
|
||||
@override
|
||||
def expect_exception(self):
|
||||
if "panic" in self.action:
|
||||
return requests.exceptions.ConnectionError
|
||||
@@ -1041,22 +1029,18 @@ class NodeKill(Failure):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
@override
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
@override
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
|
||||
@override
|
||||
def expect_available(self):
|
||||
return False
|
||||
|
||||
@override
|
||||
def mitigate(self, env: NeonEnv):
|
||||
def mitigate(self, env):
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
@@ -1075,26 +1059,21 @@ class CompositeFailure(Failure):
|
||||
self.pageserver_id = f.pageserver_id
|
||||
break
|
||||
|
||||
@override
|
||||
def apply(self, env: NeonEnv):
|
||||
for f in self.failures:
|
||||
f.apply(env)
|
||||
|
||||
@override
|
||||
def clear(self, env: NeonEnv):
|
||||
def clear(self, env):
|
||||
for f in self.failures:
|
||||
f.clear(env)
|
||||
|
||||
@override
|
||||
def expect_available(self):
|
||||
return all(f.expect_available() for f in self.failures)
|
||||
|
||||
@override
|
||||
def mitigate(self, env: NeonEnv):
|
||||
def mitigate(self, env):
|
||||
for f in self.failures:
|
||||
f.mitigate(env)
|
||||
|
||||
@override
|
||||
def expect_exception(self):
|
||||
expect = set(f.expect_exception() for f in self.failures)
|
||||
|
||||
@@ -1232,7 +1211,7 @@ def test_sharding_split_failures(
|
||||
|
||||
assert attached_count == initial_shard_count
|
||||
|
||||
def assert_split_done(exclude_ps_id: Optional[int] = None) -> None:
|
||||
def assert_split_done(exclude_ps_id=None) -> None:
|
||||
secondary_count = 0
|
||||
attached_count = 0
|
||||
for ps in env.pageservers:
|
||||
|
||||
@@ -9,7 +9,6 @@ from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import fixtures.utils
|
||||
import pytest
|
||||
from fixtures.auth_tokens import TokenScope
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
@@ -39,11 +38,7 @@ from fixtures.pg_version import PgVersion, run_only_on_default_postgres
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.storage_controller_proxy import StorageControllerProxy
|
||||
from fixtures.utils import (
|
||||
run_pg_bench_small,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
@@ -65,8 +60,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
return counts
|
||||
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
|
||||
def test_storage_controller_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test the basic lifecycle of a storage controller:
|
||||
- Restarting
|
||||
@@ -1042,7 +1038,7 @@ def test_storage_controller_tenant_deletion(
|
||||
)
|
||||
|
||||
# Break the compute hook: we are checking that deletion does not depend on the compute hook being available
|
||||
def break_hook(_body: Any):
|
||||
def break_hook():
|
||||
raise RuntimeError("Unexpected call to compute hook")
|
||||
|
||||
compute_reconfigure_listener.register_on_notify(break_hook)
|
||||
@@ -1304,11 +1300,11 @@ def test_storage_controller_heartbeats(
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"Back online: {node_to_tenants=}")
|
||||
|
||||
# ... background reconciliation may need to run to clean up the location on the node that was offline
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# ... expecting the storage controller to reach a consistent state
|
||||
env.storage_controller.consistency_check()
|
||||
def storage_controller_consistent():
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
wait_until(30, 1, storage_controller_consistent)
|
||||
|
||||
|
||||
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -6,7 +6,7 @@ import shutil
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
@@ -20,9 +20,6 @@ from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
|
||||
|
||||
@@ -479,9 +479,9 @@ def assert_size_approx_equal(size_a, size_b):
|
||||
"""
|
||||
|
||||
# Determined empirically from examples of equality failures: they differ
|
||||
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 6 to avoid
|
||||
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 4 to avoid
|
||||
# failing on outliers from that observed range.
|
||||
threshold = 6 * 8272
|
||||
threshold = 4 * 8272
|
||||
|
||||
assert size_a == pytest.approx(size_b, abs=threshold)
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
import concurrent.futures
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
@@ -11,7 +10,7 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.common_types import Lsn, TenantId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import (
|
||||
PAGESERVER_GLOBAL_METRICS,
|
||||
@@ -477,34 +476,3 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
|
||||
assert counts
|
||||
log.info(f"directory counts: {counts}")
|
||||
assert counts[2] > COUNT_AT_LEAST_EXPECTED
|
||||
|
||||
|
||||
def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
(Relaxed) regression test for issue that led to https://github.com/neondatabase/neon/pull/9268
|
||||
Create many endpoints in parallel and then restart them
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
# This param needs to be 200+ to reproduce the limit issue
|
||||
n_threads = 16
|
||||
barrier = threading.Barrier(n_threads)
|
||||
|
||||
def test_timeline(branch_name: str, timeline_id: TimelineId):
|
||||
endpoint = env.endpoints.create_start(branch_name)
|
||||
endpoint.stop()
|
||||
# Use a barrier to make sure we restart endpoints at the same time
|
||||
barrier.wait()
|
||||
endpoint.start()
|
||||
|
||||
workers = []
|
||||
|
||||
for i in range(0, n_threads):
|
||||
branch_name = f"branch_{i}"
|
||||
timeline_id = env.create_branch(branch_name)
|
||||
w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id])
|
||||
workers.append(w)
|
||||
w.start()
|
||||
|
||||
for w in workers:
|
||||
w.join()
|
||||
|
||||
Reference in New Issue
Block a user