Compare commits

..

3 Commits

Author SHA1 Message Date
Vlad Lazar
e02334c15e fixup: doc reference to renamed field 2024-09-16 19:44:09 +01:00
Christian Schwarz
f0430e97a2 prove hypothesis (inefficient fix) 2024-09-16 17:17:49 +01:00
Vlad Lazar
25e31b247b tests: add unit test for vec read with overlapped images 2024-09-16 17:17:26 +01:00
236 changed files with 4495 additions and 12151 deletions

View File

@@ -13,7 +13,6 @@
# Directories
!.cargo/
!.config/
!compute/
!compute_tools/
!control_plane/
!libs/

View File

@@ -3,23 +3,19 @@ name: Prepare benchmarking databases by restoring dumps
on:
workflow_call:
# no inputs needed
defaults:
run:
shell: bash -euxo pipefail {0}
jobs:
setup-databases:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
strategy:
fail-fast: false
matrix:
platform: [ aws-rds-postgres, aws-aurora-serverless-v2-postgres, neon ]
platform: [ aws-rds-postgres, aws-aurora-serverless-v2-postgres, neon ]
database: [ clickbench, tpch, userexample ]
env:
LD_LIBRARY_PATH: /tmp/neon/pg_install/v16/lib
PLATFORM: ${{ matrix.platform }}
@@ -27,10 +23,7 @@ jobs:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
@@ -39,13 +32,13 @@ jobs:
run: |
case "${PLATFORM}" in
neon)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
aws-rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
aws-aurora-serverless-v2-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CONNSTR }}
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}"
@@ -53,17 +46,10 @@ jobs:
;;
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
@@ -71,23 +57,23 @@ jobs:
path: /tmp/neon/
prefix: latest
# we create a table that has one row for each database that we want to restore with the status whether the restore is done
# we create a table that has one row for each database that we want to restore with the status whether the restore is done
- name: Create benchmark_restore_status table if it does not exist
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-prep-connstr.outputs.connstr }}
DATABASE_NAME: ${{ matrix.database }}
# to avoid a race condition of multiple jobs trying to create the table at the same time,
# to avoid a race condition of multiple jobs trying to create the table at the same time,
# we use an advisory lock
run: |
${PG_BINARIES}/psql "${{ env.BENCHMARK_CONNSTR }}" -c "
SELECT pg_advisory_lock(4711);
SELECT pg_advisory_lock(4711);
CREATE TABLE IF NOT EXISTS benchmark_restore_status (
databasename text primary key,
restore_done boolean
);
SELECT pg_advisory_unlock(4711);
"
- name: Check if restore is already done
id: check-restore-done
env:
@@ -121,7 +107,7 @@ jobs:
DATABASE_NAME: ${{ matrix.database }}
run: |
mkdir -p /tmp/dumps
aws s3 cp s3://neon-github-dev/performance/pgdumps/$DATABASE_NAME/$DATABASE_NAME.pg_dump /tmp/dumps/
aws s3 cp s3://neon-github-dev/performance/pgdumps/$DATABASE_NAME/$DATABASE_NAME.pg_dump /tmp/dumps/
- name: Replace database name in connection string
if: steps.check-restore-done.outputs.skip != 'true'
@@ -140,17 +126,17 @@ jobs:
else
new_connstr="${base_connstr}/${DATABASE_NAME}"
fi
echo "database_connstr=${new_connstr}" >> $GITHUB_OUTPUT
echo "database_connstr=${new_connstr}" >> $GITHUB_OUTPUT
- name: Restore dump
if: steps.check-restore-done.outputs.skip != 'true'
env:
DATABASE_NAME: ${{ matrix.database }}
DATABASE_CONNSTR: ${{ steps.replace-dbname.outputs.database_connstr }}
# the following works only with larger computes:
# the following works only with larger computes:
# PGOPTIONS: "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7"
# we add the || true because:
# the dumps were created with Neon and contain neon extensions that are not
# the dumps were created with Neon and contain neon extensions that are not
# available in RDS, so we will always report an error, but we can ignore it
run: |
${PG_BINARIES}/pg_restore --clean --if-exists --no-owner --jobs=4 \

View File

@@ -236,7 +236,9 @@ jobs:
# run pageserver tests with different settings
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
for io_buffer_alignment in 0 1 512 ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT=$io_buffer_alignment ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
done
# Run separate tests for real S3
@@ -255,15 +257,7 @@ jobs:
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: |
# Use tar to copy files matching the pattern, preserving the paths in the destionation
tar c \
pg_install/v* \
pg_install/build/*/src/test/regress/*.so \
pg_install/build/*/src/test/regress/pg_regress \
pg_install/build/*/src/test/isolation/isolationtester \
pg_install/build/*/src/test/isolation/pg_isolation_regress \
| tar x -C /tmp/neon
run: cp -a pg_install /tmp/neon/pg_install
- name: Upload Neon artifact
uses: ./.github/actions/upload

View File

@@ -52,5 +52,5 @@ jobs:
for image in ${images}; do
docker buildx imagetools create \
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
neondatabase/${image}:${{ inputs.image_tag }}
neondatabase/${image}:${{ inputs.image_tag }}
done

View File

@@ -12,6 +12,7 @@ on:
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 3 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
inputs:
region_id:
@@ -58,7 +59,7 @@ jobs:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
id-token: write # Required for OIDC authentication in azure runners
strategy:
fail-fast: false
matrix:
@@ -67,10 +68,12 @@ jobs:
PLATFORM: "neon-staging"
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
RUNNER: [ self-hosted, us-east-2, x64 ]
IMAGE: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
- DEFAULT_PG_VERSION: 16
PLATFORM: "azure-staging"
region_id: 'azure-eastus2'
RUNNER: [ self-hosted, eastus2, x64 ]
IMAGE: neondatabase/build-tools:pinned
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
@@ -83,10 +86,7 @@ jobs:
runs-on: ${{ matrix.RUNNER }}
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: ${{ matrix.IMAGE }}
options: --init
steps:
@@ -164,10 +164,6 @@ jobs:
replication-tests:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
@@ -178,21 +174,12 @@ jobs:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
@@ -280,7 +267,7 @@ jobs:
region_id_default=${{ env.DEFAULT_REGION_ID }}
runner_default='["self-hosted", "us-east-2", "x64"]'
runner_azure='["self-hosted", "eastus2", "x64"]'
image_default="neondatabase/build-tools:pinned"
image_default="369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned"
matrix='{
"pg_version" : [
16
@@ -357,7 +344,7 @@ jobs:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
id-token: write # Required for OIDC authentication in azure runners
strategy:
fail-fast: false
@@ -384,7 +371,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
- name: Configure AWS credentials # necessary on Azure runners
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
@@ -505,15 +492,17 @@ jobs:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
id-token: write # Required for OIDC authentication in azure runners
strategy:
fail-fast: false
matrix:
include:
- PLATFORM: "neonvm-captest-pgvector"
RUNNER: [ self-hosted, us-east-2, x64 ]
IMAGE: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
- PLATFORM: "azure-captest-pgvector"
RUNNER: [ self-hosted, eastus2, x64 ]
IMAGE: neondatabase/build-tools:pinned
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
@@ -522,16 +511,13 @@ jobs:
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
LD_LIBRARY_PATH: /home/nonroot/pg/usr/lib/x86_64-linux-gnu
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.PLATFORM }}
runs-on: ${{ matrix.RUNNER }}
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: ${{ matrix.IMAGE }}
options: --init
steps:
@@ -541,26 +527,17 @@ jobs:
# instead of using Neon artifacts containing pgbench
- name: Install postgresql-16 where pytest expects it
run: |
# Just to make it easier to test things locally on macOS (with arm64)
arch=$(uname -m | sed 's/x86_64/amd64/g' | sed 's/aarch64/arm64/g')
cd /home/nonroot
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-17/libpq5_17.0-1.pgdg110+1_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.4-1.pgdg110+2_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.4-1.pgdg110+2_${arch}.deb"
dpkg -x libpq5_17.0-1.pgdg110+1_${arch}.deb pg
dpkg -x postgresql-16_16.4-1.pgdg110+2_${arch}.deb pg
dpkg -x postgresql-client-16_16.4-1.pgdg110+2_${arch}.deb pg
wget -q https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/libpq5_16.4-1.pgdg110%2B1_amd64.deb
wget -q https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.4-1.pgdg110%2B1_amd64.deb
wget -q https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.4-1.pgdg110%2B1_amd64.deb
dpkg -x libpq5_16.4-1.pgdg110+1_amd64.deb pg
dpkg -x postgresql-client-16_16.4-1.pgdg110+1_amd64.deb pg
dpkg -x postgresql-16_16.4-1.pgdg110+1_amd64.deb pg
mkdir -p /tmp/neon/pg_install/v16/bin
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v16/bin/pgbench
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v16/bin/psql
ln -s /home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu /tmp/neon/pg_install/v16/lib
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH:-}"
export LD_LIBRARY_PATH
echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${GITHUB_ENV}
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v16/bin/pgbench
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v16/bin/psql
ln -s /home/nonroot/pg/usr/lib/x86_64-linux-gnu /tmp/neon/pg_install/v16/lib
/tmp/neon/pg_install/v16/bin/pgbench --version
/tmp/neon/pg_install/v16/bin/psql --version
@@ -582,7 +559,7 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
- name: Configure AWS credentials
- name: Configure AWS credentials # necessary on Azure runners to read/write from/to S3
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
@@ -643,10 +620,6 @@ jobs:
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
needs: [ generate-matrices, pgbench-compare, prepare_AWS_RDS_databases ]
strategy:
@@ -665,22 +638,12 @@ jobs:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
@@ -751,10 +714,6 @@ jobs:
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
needs: [ generate-matrices, clickbench-compare, prepare_AWS_RDS_databases ]
strategy:
@@ -772,22 +731,12 @@ jobs:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
@@ -857,10 +806,6 @@ jobs:
user-examples-compare:
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
needs: [ generate-matrices, tpch-compare, prepare_AWS_RDS_databases ]
strategy:
@@ -877,22 +822,12 @@ jobs:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:

View File

@@ -54,8 +54,8 @@ jobs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
@@ -120,59 +120,6 @@ jobs:
- name: Run mypy to check types
run: poetry run mypy .
# Check that the vendor/postgres-* submodules point to the
# corresponding REL_*_STABLE_neon branches.
check-submodules:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
- uses: dorny/paths-filter@v3
id: check-if-submodules-changed
with:
filters: |
vendor:
- 'vendor/**'
- name: Check vendor/postgres-v14 submodule reference
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
uses: jtmullen/submodule-branch-check-action@v1
with:
path: "vendor/postgres-v14"
fetch_depth: "50"
sub_fetch_depth: "50"
pass_if_unchanged: true
- name: Check vendor/postgres-v15 submodule reference
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
uses: jtmullen/submodule-branch-check-action@v1
with:
path: "vendor/postgres-v15"
fetch_depth: "50"
sub_fetch_depth: "50"
pass_if_unchanged: true
- name: Check vendor/postgres-v16 submodule reference
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
uses: jtmullen/submodule-branch-check-action@v1
with:
path: "vendor/postgres-v16"
fetch_depth: "50"
sub_fetch_depth: "50"
pass_if_unchanged: true
- name: Check vendor/postgres-v17 submodule reference
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
uses: jtmullen/submodule-branch-check-action@v1
with:
path: "vendor/postgres-v17"
fetch_depth: "50"
sub_fetch_depth: "50"
pass_if_unchanged: true
check-codestyle-rust:
needs: [ check-permissions, build-build-tools-image ]
strategy:
@@ -212,10 +159,6 @@ jobs:
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
#
# The only difference between "clippy --debug" and "clippy --release" is that in --release mode,
# #[cfg(debug_assertions)] blocks are not built. It's not worth building everything for second
# time just for that, so skip "clippy --release".
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
@@ -225,6 +168,8 @@ jobs:
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
@@ -412,7 +357,6 @@ jobs:
})
coverage-report:
if: ${{ !startsWith(github.ref_name, 'release') }}
needs: [ check-permissions, build-build-tools-image, build-and-test-locally ]
runs-on: [ self-hosted, small ]
container:
@@ -429,8 +373,8 @@ jobs:
coverage-html: ${{ steps.upload-coverage-report-new.outputs.report-url }}
coverage-json: ${{ steps.upload-coverage-report-new.outputs.summary-json }}
steps:
# Need `fetch-depth: 0` for differential coverage (to get diff between two commits)
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 0
@@ -531,9 +475,11 @@ jobs:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
steps:
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 0
- uses: ./.github/actions/set-docker-config-dir
- uses: docker/setup-buildx-action@v3
@@ -602,28 +548,17 @@ jobs:
strategy:
fail-fast: false
matrix:
version:
# Much data was already generated on old PG versions with bullseye's
# libraries, the locales of which can cause data incompatibilities.
# However, new PG versions should check if they can be built on newer
# images, as that reduces the support burden of old and ancient
# distros.
- pg: v14
debian: bullseye-slim
- pg: v15
debian: bullseye-slim
- pg: v16
debian: bullseye-slim
- pg: v17
debian: bookworm-slim
version: [ v14, v15, v16, v17 ]
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
steps:
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 0
- uses: ./.github/actions/set-docker-config-dir
- uses: docker/setup-buildx-action@v3
@@ -658,46 +593,41 @@ jobs:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
PG_VERSION=${{ matrix.version }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
file: Dockerfile.compute-node
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
tags: |
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version.pg == 'v16'
if: matrix.version == 'v16'
uses: docker/build-push-action@v6
with:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
PG_VERSION=${{ matrix.version }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
file: Dockerfile.compute-node
target: neon-pg-ext-test
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
tags: |
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
# We pick 16, because that builds on debian 11 with older glibc (and is
# thus compatible with newer glibc), rather than 17 on Debian 12, as
# that isn't guaranteed to be compatible with Debian 11
if: matrix.version.pg == 'v16'
if: matrix.version == 'v17'
uses: docker/build-push-action@v6
with:
target: compute-tools-image
@@ -706,11 +636,10 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
DEBIAN_FLAVOR=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/Dockerfile.compute-node
file: Dockerfile.compute-node
tags: |
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
@@ -773,10 +702,13 @@ jobs:
matrix:
version: [ v14, v15, v16, v17 ]
env:
VM_BUILDER_VERSION: v0.35.0
VM_BUILDER_VERSION: v0.29.3
steps:
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Downloading vm-builder
run: |
@@ -798,7 +730,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder \
-spec=compute/vm-image-spec.yaml \
-spec=vm-image-spec.yaml \
-src=neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
@@ -816,7 +748,10 @@ jobs:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
steps:
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: ./.github/actions/set-docker-config-dir
- uses: docker/login-action@v3
@@ -862,9 +797,6 @@ jobs:
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
permissions:
id-token: write # for `aws-actions/configure-aws-credentials`
env:
VERSIONS: v14 v15 v16 v17
@@ -909,19 +841,13 @@ jobs:
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
- name: Configure AWS-prod credentials
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
mask-aws-account-id: true
role-to-assume: ${{ secrets.PROD_GHA_OIDC_ROLE }}
- name: Login to prod ECR
uses: docker/login-action@v3
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
with:
registry: 093970136003.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_ACCESS_KEY_ID }}
password: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_SECRET_ACCESS_KEY }}
- name: Copy all images to prod ECR
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
@@ -1031,7 +957,6 @@ jobs:
deploy:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod`
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
runs-on: [ self-hosted, small ]
@@ -1051,7 +976,10 @@ jobs:
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Trigger deploy workflow
env:
@@ -1130,8 +1058,7 @@ jobs:
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:
needs: [ deploy ]
# `!failure() && !cancelled()` is required because the workflow transitively depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod`
if: github.ref_name == 'release' && !failure() && !cancelled()
if: github.ref_name == 'release'
runs-on: ubuntu-22.04
steps:
@@ -1190,9 +1117,10 @@ jobs:
files_to_promote+=("s3://${BUCKET}/${s3_key}")
for pg_version in v14 v15 v16 v17; do
# TODO Add v17
for pg_version in v14 v15 v16; do
# We run less tests for debug builds, so we don't need to promote them
if [ "${build_type}" == "debug" ] && { [ "${arch}" == "ARM64" ] || [ "${pg_version}" != "v17" ] ; }; then
if [ "${build_type}" == "debug" ] && { [ "${arch}" == "ARM64" ] || [ "${pg_version}" != "v16" ] ; }; then
continue
fi

View File

@@ -1,102 +0,0 @@
name: Cloud Regression Test
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '45 1 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow
group: ${{ github.workflow }}
cancel-in-progress: true
jobs:
regress:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
runs-on: us-east-2
container:
image: neondatabase/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Patch the test
run: |
cd "vendor/postgres-v${DEFAULT_PG_VERSION}"
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
- name: Generate a random password
id: pwgen
run: |
set +x
DBPASS=$(dd if=/dev/random bs=48 count=1 2>/dev/null | base64)
echo "::add-mask::${DBPASS//\//}"
echo DBPASS="${DBPASS//\//}" >> "${GITHUB_OUTPUT}"
- name: Change tests according to the generated password
env:
DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
run: |
cd vendor/postgres-v"${DEFAULT_PG_VERSION}"/src/test/regress
for fname in sql/*.sql expected/*.out; do
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
done
for ph in $(grep NEON_MD5_PLACEHOLDER expected/password.out | awk '{print $3;}' | sort | uniq); do
USER=$(echo "${ph}" | cut -c 22-)
MD5=md5$(echo -n "${DBPASS}${USER}" | md5sum | awk '{print $1;}')
sed -i.bak "s/${ph}/${MD5}/" expected/password.out
done
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Run the regression tests
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: cloud_regress
pg_version: ${{ env.DEFAULT_PG_VERSION }}
extra_params: -m remote_cluster
env:
BENCHMARK_CONNSTR: ${{ secrets.PG_REGRESS_CONNSTR }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # on-call-staging-stream
slack-message: |
Periodic pg_regress on staging: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -34,8 +34,8 @@ jobs:
build-tag: ${{ steps.build-tag.outputs.tag }}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
@@ -102,12 +102,12 @@ jobs:
# Default set of platforms to run e2e tests on
platforms='["docker", "k8s"]'
# If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or compute/Dockerfile.compute-node, add k8s-neonvm to the list of platforms.
# If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or Dockerfile.compute-node, add k8s-neonvm to the list of platforms.
# If the workflow run is not a pull request, add k8s-neonvm to the list.
if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then
for f in $(gh api "/repos/${GITHUB_REPOSITORY}/pulls/${PR_NUMBER}/files" --paginate --jq '.[].filename'); do
case "$f" in
vendor/*|pgxn/*|libs/vm_monitor/*|compute/Dockerfile.compute-node)
vendor/*|pgxn/*|libs/vm_monitor/*|Dockerfile.compute-node)
platforms=$(echo "${platforms}" | jq --compact-output '. += ["k8s-neonvm"] | unique')
;;
*)

461
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -76,6 +76,8 @@ clap = { version = "4.0", features = ["derive"] }
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
crossbeam-utils = "0.8.5"
dashmap = { version = "5.5.0", features = ["raw-api"] }
either = "1.8"
enum-map = "2.4.2"
@@ -93,7 +95,7 @@ hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"
hostname = "0.4"
hostname = "0.3.1"
http = {version = "1.1.0", features = ["std"]}
http-types = { version = "2", default-features = false }
humantime = "2.1"
@@ -102,6 +104,7 @@ hyper = "0.14"
tokio-tungstenite = "0.20.0"
indexmap = "2"
indoc = "2"
inotify = "0.10.2"
ipnet = "2.9.0"
itertools = "0.10"
jsonwebtoken = "9"
@@ -110,7 +113,7 @@ libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.9"
memoffset = "0.8"
nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"
num_cpus = "1.15"
@@ -139,6 +142,7 @@ rpds = "0.13"
rustc-hash = "1.1.0"
rustls = "0.22"
rustls-pemfile = "2"
rustls-split = "0.3"
scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
@@ -160,6 +164,7 @@ strum_macros = "0.26"
svg_fmt = "0.4.3"
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"
test-context = "0.3"
thiserror = "1.0"
tikv-jemallocator = "0.5"

View File

@@ -13,9 +13,6 @@ RUN useradd -ms /bin/bash nonroot -b /home
SHELL ["/bin/bash", "-c"]
# System deps
#
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -e \
&& apt update \
&& apt install -y \
@@ -27,7 +24,6 @@ RUN set -e \
cmake \
curl \
flex \
gdb \
git \
gnupg \
gzip \

View File

@@ -3,15 +3,13 @@ ARG REPOSITORY=neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
ARG DEBIAN_FLAVOR=bullseye-slim
#########################################################################################
#
# Layer "build-deps"
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS build-deps
ARG DEBIAN_FLAVOR
FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
@@ -57,27 +55,22 @@ RUN cd postgres && \
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
old_list="pg_stat_statements--1.0--1.1.sql pg_stat_statements--1.1--1.2.sql pg_stat_statements--1.2--1.3.sql pg_stat_statements--1.3--1.4.sql pg_stat_statements--1.4--1.5.sql pg_stat_statements--1.4.sql pg_stat_statements--1.5--1.6.sql"; \
# the first loop is for pg_stat_statement extension version <= 1.6
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
# Note that there are no downgrade scripts for pg_stat_statements, so we \
# don't have to modify any downgrade paths or (much) older versions: we only \
# have to make sure every creation of the pg_stat_statements_reset function \
# also adds execute permissions to the neon_superuser.
case $filename in \
pg_stat_statements--1.4.sql) \
# pg_stat_statements_reset is first created with 1.4
if echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.6--1.7.sql) \
# Then with the 1.6-1.7 migration it is re-created with a new signature, thus add the permissions back
fi; \
done; \
# the second loop is for pg_stat_statement extension versions >= 1.7,
# where pg_stat_statement_reset() got 3 additional arguments
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
if ! echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.10--1.11.sql) \
# Then with the 1.10-1.11 migration it is re-created with a new signature again, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO neon_superuser;' >> $file; \
;; \
esac; \
done;
fi; \
done
#########################################################################################
#
@@ -282,7 +275,7 @@ FROM build-deps AS vector-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/pgvector.patch /pgvector.patch
COPY patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
@@ -368,7 +361,7 @@ FROM build-deps AS rum-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/rum.patch /rum.patch
COPY patches/rum.patch /rum.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
@@ -1029,47 +1022,10 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
ARG DEBIAN_FLAVOR
FROM debian:bullseye-slim AS compute-tools-image
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
#########################################################################################
#
# Layer "pgbouncer"
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
ARG DEBIAN_FLAVOR
RUN set -e \
&& apt-get update \
&& apt-get install -y \
build-essential \
git \
libevent-dev \
libtool \
pkg-config
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \
&& ./autogen.sh \
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=
#########################################################################################
#
# Layers "postgres-exporter" and "sql-exporter"
#
#########################################################################################
FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
FROM burningalchemist/sql_exporter:0.13 AS sql-exporter
#########################################################################################
#
# Clean up postgres folder before inclusion
@@ -1117,7 +1073,7 @@ COPY --from=pgjwt-pg-build /pgjwt.tar.gz /ext-src
COPY --from=hypopg-pg-build /hypopg.tar.gz /ext-src
COPY --from=pg-hashids-pg-build /pg_hashids.tar.gz /ext-src
COPY --from=rum-pg-build /rum.tar.gz /ext-src
COPY compute/patches/rum.patch /ext-src
COPY patches/rum.patch /ext-src
#COPY --from=pgtap-pg-build /pgtap.tar.gz /ext-src
COPY --from=ip4r-pg-build /ip4r.tar.gz /ext-src
COPY --from=prefix-pg-build /prefix.tar.gz /ext-src
@@ -1125,9 +1081,9 @@ COPY --from=hll-pg-build /hll.tar.gz /ext-src
COPY --from=plpgsql-check-pg-build /plpgsql_check.tar.gz /ext-src
#COPY --from=timescaledb-pg-build /timescaledb.tar.gz /ext-src
COPY --from=pg-hint-plan-pg-build /pg_hint_plan.tar.gz /ext-src
COPY compute/patches/pg_hint_plan.patch /ext-src
COPY patches/pg_hint_plan.patch /ext-src
COPY --from=pg-cron-pg-build /pg_cron.tar.gz /ext-src
COPY compute/patches/pg_cron.patch /ext-src
COPY patches/pg_cron.patch /ext-src
#COPY --from=pg-pgx-ulid-build /home/nonroot/pgx_ulid.tar.gz /ext-src
#COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
COPY --from=pg-uuidv7-pg-build /pg_uuidv7.tar.gz /ext-src
@@ -1136,7 +1092,7 @@ COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
#COPY --from=pg-embedding-pg-build /home/nonroot/pg_embedding-src/ /ext-src
#COPY --from=wal2json-pg-build /wal2json_2_5.tar.gz /ext-src
COPY --from=pg-anon-pg-build /pg_anon.tar.gz /ext-src
COPY compute/patches/pg_anon.patch /ext-src
COPY patches/pg_anon.patch /ext-src
COPY --from=pg-ivm-build /pg_ivm.tar.gz /ext-src
COPY --from=pg-partman-build /pg_partman.tar.gz /ext-src
RUN case "${PG_VERSION}" in "v17") \
@@ -1183,9 +1139,7 @@ ENV PGDATABASE=postgres
# Put it all together into the final image
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR
ARG DEBIAN_FLAVOR
ENV DEBIAN_FLAVOR=$DEBIAN_FLAVOR
FROM debian:bullseye-slim
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \
@@ -1201,50 +1155,23 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
# pgbouncer and its config
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
# Metrics exporter binaries and configuration files
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
COPY --from=sql-exporter /bin/sql_exporter /bin/sql_exporter
COPY --chmod=0644 compute/etc/sql_exporter.yml /etc/sql_exporter.yml
COPY --chmod=0644 compute/etc/neon_collector.yml /etc/neon_collector.yml
COPY --chmod=0644 compute/etc/sql_exporter_autoscaling.yml /etc/sql_exporter_autoscaling.yml
COPY --chmod=0644 compute/etc/neon_collector_autoscaling.yml /etc/neon_collector_autoscaling.yml
# Create remote extension download directory
RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libsfcgal1, and libprotobuf-c1 for PostGIS
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
# libboost* for rdkit
# ca-certificates for communicating with s3 by compute_ctl
RUN apt update && \
case $DEBIAN_FLAVOR in \
# Version-specific installs for Bullseye (PG14-PG16):
# libicu67, locales for collations (including ICU and plpgsql_check)
# libgdal28, libproj19 for PostGIS
bullseye*) \
VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \
;; \
# Version-specific installs for Bookworm (PG17):
# libicu72, locales for collations (including ICU and plpgsql_check)
# libgdal32, libproj25 for PostGIS
bookworm*) \
VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \
;; \
esac && \
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
libicu67 \
liblz4-1 \
libreadline8 \
libboost-iostreams1.74.0 \
@@ -1253,6 +1180,8 @@ RUN apt update && \
libboost-system1.74.0 \
libossp-uuid16 \
libgeos-c1v5 \
libgdal28 \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
libxml2 \
@@ -1261,8 +1190,7 @@ RUN apt update && \
libcurl4-openssl-dev \
locales \
procps \
ca-certificates \
$VERSION_INSTALLS && \
ca-certificates && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -1,21 +0,0 @@
This directory contains files that are needed to build the compute
images, or included in the compute images.
Dockerfile.compute-node
To build the compute image
vm-image-spec.yaml
Instructions for vm-builder, to turn the compute-node image into
corresponding vm-compute-node image.
etc/
Configuration files included in /etc in the compute image
patches/
Some extensions need to be patched to work with Neon. This
directory contains such patches. They are applied to the extension
sources in Dockerfile.compute-node
In addition to these, postgres itself, the neon postgres extension,
and compute_ctl are built and copied into the compute image by
Dockerfile.compute-node.

View File

@@ -1,246 +0,0 @@
collector_name: neon_collector
metrics:
- metric_name: lfc_misses
type: gauge
help: 'lfc_misses'
key_labels:
values: [lfc_misses]
query: |
select lfc_value as lfc_misses from neon.neon_lfc_stats where lfc_key='file_cache_misses';
- metric_name: lfc_used
type: gauge
help: 'LFC chunks used (chunk = 1MB)'
key_labels:
values: [lfc_used]
query: |
select lfc_value as lfc_used from neon.neon_lfc_stats where lfc_key='file_cache_used';
- metric_name: lfc_hits
type: gauge
help: 'lfc_hits'
key_labels:
values: [lfc_hits]
query: |
select lfc_value as lfc_hits from neon.neon_lfc_stats where lfc_key='file_cache_hits';
- metric_name: lfc_writes
type: gauge
help: 'lfc_writes'
key_labels:
values: [lfc_writes]
query: |
select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes';
- metric_name: lfc_cache_size_limit
type: gauge
help: 'LFC cache size limit in bytes'
key_labels:
values: [lfc_cache_size_limit]
query: |
select pg_size_bytes(current_setting('neon.file_cache_size_limit')) as lfc_cache_size_limit;
- metric_name: connection_counts
type: gauge
help: 'Connection counts'
key_labels:
- datname
- state
values: [count]
query: |
select datname, state, count(*) as count from pg_stat_activity where state <> '' group by datname, state;
- metric_name: pg_stats_userdb
type: gauge
help: 'Stats for several oldest non-system dbs'
key_labels:
- datname
value_label: kind
values:
- db_size
- deadlocks
# Rows
- inserted
- updated
- deleted
# We export stats for 10 non-system database. Without this limit
# it is too easy to abuse the system by creating lots of databases.
query: |
select pg_database_size(datname) as db_size, deadlocks,
tup_inserted as inserted, tup_updated as updated, tup_deleted as deleted,
datname
from pg_stat_database
where datname IN (
select datname
from pg_database
where datname <> 'postgres' and not datistemplate
order by oid
limit 10
);
- metric_name: max_cluster_size
type: gauge
help: 'neon.max_cluster_size setting'
key_labels:
values: [max_cluster_size]
query: |
select setting::int as max_cluster_size from pg_settings where name = 'neon.max_cluster_size';
- metric_name: db_total_size
type: gauge
help: 'Size of all databases'
key_labels:
values: [total]
query: |
select sum(pg_database_size(datname)) as total from pg_database;
# DEPRECATED
- metric_name: lfc_approximate_working_set_size
type: gauge
help: 'Approximate working set size in pages of 8192 bytes'
key_labels:
values: [approximate_working_set_size]
query: |
select neon.approximate_working_set_size(false) as approximate_working_set_size;
- metric_name: lfc_approximate_working_set_size_windows
type: gauge
help: 'Approximate working set size in pages of 8192 bytes'
key_labels: [duration]
values: [size]
# NOTE: This is the "public" / "human-readable" version. Here, we supply a small selection
# of durations in a pretty-printed form.
query: |
select
x as duration,
neon.approximate_working_set_size_seconds(extract('epoch' from x::interval)::int) as size
from
(values ('5m'),('15m'),('1h')) as t (x);
- metric_name: compute_current_lsn
type: gauge
help: 'Current LSN of the database'
key_labels:
values: [lsn]
query: |
select
case
when pg_catalog.pg_is_in_recovery()
then (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
else (pg_current_wal_lsn() - '0/0')::FLOAT8
end as lsn;
- metric_name: compute_receive_lsn
type: gauge
help: 'Returns the last write-ahead log location that has been received and synced to disk by streaming replication'
key_labels:
values: [lsn]
query: |
SELECT
CASE
WHEN pg_catalog.pg_is_in_recovery()
THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
ELSE 0
END AS lsn;
- metric_name: replication_delay_bytes
type: gauge
help: 'Bytes between received and replayed LSN'
key_labels:
values: [replication_delay_bytes]
# We use a GREATEST call here because this calculation can be negative.
# The calculation is not atomic, meaning after we've gotten the receive
# LSN, the replay LSN may have advanced past the receive LSN we
# are using for the calculation.
query: |
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
- metric_name: replication_delay_seconds
type: gauge
help: 'Time since last LSN was replayed'
key_labels:
values: [replication_delay_seconds]
query: |
SELECT
CASE
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST (0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
END AS replication_delay_seconds;
- metric_name: checkpoints_req
type: gauge
help: 'Number of requested checkpoints'
key_labels:
values: [checkpoints_req]
query: |
SELECT checkpoints_req FROM pg_stat_bgwriter;
- metric_name: checkpoints_timed
type: gauge
help: 'Number of scheduled checkpoints'
key_labels:
values: [checkpoints_timed]
query: |
SELECT checkpoints_timed FROM pg_stat_bgwriter;
- metric_name: compute_logical_snapshot_files
type: gauge
help: 'Number of snapshot files in pg_logical/snapshot'
key_labels:
- timeline_id
values: [num_logical_snapshot_files]
query: |
SELECT
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
-- temporary snapshot files are renamed to the actual snapshot files after they are
-- completely built. We only WAL-log the completely built snapshot files.
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
# Number of slots is limited by max_replication_slots, so collecting position for all of them shouldn't be bad.
- metric_name: logical_slot_restart_lsn
type: gauge
help: 'restart_lsn of logical slots'
key_labels:
- slot_name
values: [restart_lsn]
query: |
select slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
from pg_replication_slots
where slot_type = 'logical';
- metric_name: compute_subscriptions_count
type: gauge
help: 'Number of logical replication subscriptions grouped by enabled/disabled'
key_labels:
- enabled
values: [subscriptions_count]
query: |
select subenabled::text as enabled, count(*) as subscriptions_count
from pg_subscription
group by subenabled;
- metric_name: retained_wal
type: gauge
help: 'Retained WAL in inactive replication slots'
key_labels:
- slot_name
values: [retained_wal]
query: |
SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
WHERE active = false;
- metric_name: wal_is_lost
type: gauge
help: 'Whether or not the replication slot wal_status is lost'
key_labels:
- slot_name
values: [wal_is_lost]
query: |
SELECT slot_name,
CASE WHEN wal_status = 'lost' THEN 1 ELSE 0 END AS wal_is_lost
FROM pg_replication_slots;

View File

@@ -1,55 +0,0 @@
collector_name: neon_collector_autoscaling
metrics:
- metric_name: lfc_misses
type: gauge
help: 'lfc_misses'
key_labels:
values: [lfc_misses]
query: |
select lfc_value as lfc_misses from neon.neon_lfc_stats where lfc_key='file_cache_misses';
- metric_name: lfc_used
type: gauge
help: 'LFC chunks used (chunk = 1MB)'
key_labels:
values: [lfc_used]
query: |
select lfc_value as lfc_used from neon.neon_lfc_stats where lfc_key='file_cache_used';
- metric_name: lfc_hits
type: gauge
help: 'lfc_hits'
key_labels:
values: [lfc_hits]
query: |
select lfc_value as lfc_hits from neon.neon_lfc_stats where lfc_key='file_cache_hits';
- metric_name: lfc_writes
type: gauge
help: 'lfc_writes'
key_labels:
values: [lfc_writes]
query: |
select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes';
- metric_name: lfc_cache_size_limit
type: gauge
help: 'LFC cache size limit in bytes'
key_labels:
values: [lfc_cache_size_limit]
query: |
select pg_size_bytes(current_setting('neon.file_cache_size_limit')) as lfc_cache_size_limit;
- metric_name: lfc_approximate_working_set_size_windows
type: gauge
help: 'Approximate working set size in pages of 8192 bytes'
key_labels: [duration_seconds]
values: [size]
# NOTE: This is the "internal" / "machine-readable" version. This outputs the working set
# size looking back 1..60 minutes, labeled with the number of minutes.
query: |
select
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) as size
from
(select generate_series * 60 as x from generate_series(1, 60)) as t (x);

View File

@@ -1,17 +0,0 @@
[databases]
*=host=localhost port=5432 auth_user=cloud_admin
[pgbouncer]
listen_port=6432
listen_addr=0.0.0.0
auth_type=scram-sha-256
auth_user=cloud_admin
auth_dbname=postgres
client_tls_sslmode=disable
server_tls_sslmode=disable
pool_mode=transaction
max_client_conn=10000
default_pool_size=64
max_prepared_statements=0
admin_users=postgres
unix_socket_dir=/tmp/
unix_socket_mode=0777

View File

@@ -1,33 +0,0 @@
# Configuration for sql_exporter
# Global defaults.
global:
# If scrape_timeout <= 0, no timeout is set unless Prometheus provides one. The default is 10s.
scrape_timeout: 10s
# Subtracted from Prometheus' scrape_timeout to give us some headroom and prevent Prometheus from timing out first.
scrape_timeout_offset: 500ms
# Minimum interval between collector runs: by default (0s) collectors are executed on every scrape.
min_interval: 0s
# Maximum number of open connections to any one target. Metric queries will run concurrently on multiple connections,
# as will concurrent scrapes.
max_connections: 1
# Maximum number of idle connections to any one target. Unless you use very long collection intervals, this should
# always be the same as max_connections.
max_idle_connections: 1
# Maximum number of maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse.
# If 0, connections are not closed due to a connection's age.
max_connection_lifetime: 5m
# The target to monitor and the collectors to execute on it.
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
collectors: [neon_collector]
# Collector files specifies a list of globs. One collector definition is read from each matching file.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
collector_files:
- "neon_collector.yml"

View File

@@ -1,33 +0,0 @@
# Configuration for sql_exporter for autoscaling-agent
# Global defaults.
global:
# If scrape_timeout <= 0, no timeout is set unless Prometheus provides one. The default is 10s.
scrape_timeout: 10s
# Subtracted from Prometheus' scrape_timeout to give us some headroom and prevent Prometheus from timing out first.
scrape_timeout_offset: 500ms
# Minimum interval between collector runs: by default (0s) collectors are executed on every scrape.
min_interval: 0s
# Maximum number of open connections to any one target. Metric queries will run concurrently on multiple connections,
# as will concurrent scrapes.
max_connections: 1
# Maximum number of idle connections to any one target. Unless you use very long collection intervals, this should
# always be the same as max_connections.
max_idle_connections: 1
# Maximum number of maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse.
# If 0, connections are not closed due to a connection's age.
max_connection_lifetime: 5m
# The target to monitor and the collectors to execute on it.
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
collectors: [neon_collector_autoscaling]
# Collector files specifies a list of globs. One collector definition is read from each matching file.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
collector_files:
- "neon_collector_autoscaling.yml"

File diff suppressed because it is too large Load Diff

View File

@@ -1,117 +0,0 @@
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
---
commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
# restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for
# running it as root.
- name: chmod-resize-swap
user: root
sysvInitAction: sysinit
shell: 'chmod 711 /neonvm/bin/resize-swap'
- name: chmod-set-disk-quota
user: root
sysvInitAction: sysinit
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
- name: pgbouncer
user: postgres
sysvInitAction: respawn
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
- name: sql-exporter
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml -web.listen-address=:9399'
- name: sql-exporter-autoscaling
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
- filename: compute_ctl-sudoers
content: |
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = postgres;
}
task {
gid = users;
}
}
memory {}
}
build: |
# Build cgroup-tools
#
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
# requires cgroup v2, so we'll build cgroup-tools ourselves.
FROM debian:bullseye-slim as libcgroup-builder
ENV LIBCGROUP_VERSION=v2.0.3
RUN set -exu \
&& apt update \
&& apt install --no-install-recommends -y \
git \
ca-certificates \
automake \
cmake \
make \
gcc \
byacc \
flex \
libtool \
libpam0g-dev \
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
&& INSTALL_DIR="/libcgroup-install" \
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
&& cd libcgroup \
# extracted from bootstrap.sh, with modified flags:
&& (test -d m4 || mkdir m4) \
&& autoreconf -fi \
&& rm -rf autom4te.cache \
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
# actually build the thing...
&& make install
merge: |
# tweak nofile limits
RUN set -e \
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
&& test ! -e /etc/security || ( \
echo '* - nofile 1048576' >>/etc/security/limits.conf \
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
)
# Allow postgres user (compute_ctl) to run swap resizer.
# Need to install sudo in order to allow this.
#
# Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe.
RUN set -e \
&& apt update \
&& apt install --no-install-recommends -y \
sudo \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
COPY compute_ctl-sudoers /etc/sudoers.d/compute_ctl-sudoers
COPY cgconfig.conf /etc/cgconfig.conf
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -11,6 +11,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
@@ -23,6 +24,7 @@ num_cpus.workspace = true
opentelemetry.workspace = true
postgres.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
@@ -41,6 +43,7 @@ url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" }
zstd = "0.13"

View File

@@ -44,7 +44,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -152,7 +151,6 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind");
let set_disk_quota_for_fs = matches.get_one::<String>("set-disk-quota-for-fs");
Ok(ProcessCliResult {
connstr,
@@ -163,7 +161,6 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
spec_json,
spec_path,
resize_swap_on_bind,
set_disk_quota_for_fs,
})
}
@@ -176,7 +173,6 @@ struct ProcessCliResult<'clap> {
spec_json: Option<&'clap String>,
spec_path: Option<&'clap String>,
resize_swap_on_bind: bool,
set_disk_quota_for_fs: Option<&'clap String>,
}
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
@@ -297,7 +293,6 @@ fn wait_spec(
pgbin,
ext_remote_storage,
resize_swap_on_bind,
set_disk_quota_for_fs,
http_port,
..
}: ProcessCliResult,
@@ -378,7 +373,6 @@ fn wait_spec(
compute,
http_port,
resize_swap_on_bind,
set_disk_quota_for_fs: set_disk_quota_for_fs.cloned(),
})
}
@@ -387,7 +381,6 @@ struct WaitSpecResult {
// passed through from ProcessCliResult
http_port: u16,
resize_swap_on_bind: bool,
set_disk_quota_for_fs: Option<String>,
}
fn start_postgres(
@@ -397,7 +390,6 @@ fn start_postgres(
compute,
http_port,
resize_swap_on_bind,
set_disk_quota_for_fs,
}: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
@@ -411,7 +403,6 @@ fn start_postgres(
);
// before we release the mutex, fetch the swap size (if any) for later.
let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
let disk_quota_bytes = state.pspec.as_ref().unwrap().spec.disk_quota_bytes;
drop(state);
// Launch remaining service threads
@@ -431,8 +422,8 @@ fn start_postgres(
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
let size_gib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_gib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
@@ -441,29 +432,10 @@ fn start_postgres(
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, set_disk_quota_for_fs)
{
match set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{err:?}"));
state.status = ComputeStatus::Failed;
compute.state_changed.notify_all();
delay_exit = true;
}
}
@@ -478,7 +450,16 @@ fn start_postgres(
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
}
@@ -769,11 +750,6 @@ fn cli() -> clap::Command {
.long("resize-swap-on-bind")
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("set-disk-quota-for-fs")
.long("set-disk-quota-for-fs")
.value_name("SET_DISK_QUOTA_FOR_FS")
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers

View File

@@ -10,7 +10,6 @@ use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::{Context, Result};
@@ -306,13 +305,6 @@ impl ComputeNode {
self.state_changed.notify_all();
}
pub fn set_failed_status(&self, err: anyhow::Error) {
let mut state = self.state.lock().unwrap();
state.error = Some(format!("{err:?}"));
state.status = ComputeStatus::Failed;
self.state_changed.notify_all();
}
pub fn get_status(&self) -> ComputeStatus {
self.state.lock().unwrap().status
}
@@ -718,7 +710,7 @@ impl ComputeNode {
info!("running initdb");
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
Command::new(initdb_bin)
.args(["--pgdata", pgdata])
.args(["-D", pgdata])
.output()
.expect("cannot start initdb process");
@@ -1131,9 +1123,6 @@ impl ComputeNode {
//
// Use that as a default location and pattern, except macos where core dumps are written
// to /cores/ directory by default.
//
// With default Linux settings, the core dump file is called just "core", so check for
// that too.
pub fn check_for_core_dumps(&self) -> Result<()> {
let core_dump_dir = match std::env::consts::OS {
"macos" => Path::new("/cores/"),
@@ -1145,17 +1134,8 @@ impl ComputeNode {
let files = fs::read_dir(core_dump_dir)?;
let cores = files.filter_map(|entry| {
let entry = entry.ok()?;
let is_core_dump = match entry.file_name().to_str()? {
n if n.starts_with("core.") => true,
"core" => true,
_ => false,
};
if is_core_dump {
Some(entry.path())
} else {
None
}
let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
Some(entry.path())
});
// Print backtrace for each core dump
@@ -1406,36 +1386,6 @@ LIMIT 100",
}
Ok(remote_ext_metrics)
}
/// Waits until current thread receives a state changed notification and
/// the pageserver connection strings has changed.
///
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_connstr = state
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_connstr = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
unchanged
})
.unwrap();
if !unchanged {
info!("Pageserver config changed");
}
}
}
pub fn forward_termination_signal() {

View File

@@ -11,17 +11,9 @@ use crate::compute::ComputeNode;
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let mut state = compute.state.lock().unwrap();
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// Re-check the status after waking up
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;

View File

@@ -1,25 +0,0 @@
use anyhow::Context;
pub const DISK_QUOTA_BIN: &str = "/neonvm/bin/set-disk-quota";
/// If size_bytes is 0, it disables the quota. Otherwise, it sets filesystem quota to size_bytes.
/// `fs_mountpoint` should point to the mountpoint of the filesystem where the quota should be set.
pub fn set_disk_quota(size_bytes: u64, fs_mountpoint: &str) -> anyhow::Result<()> {
let size_kb = size_bytes / 1024;
// run `/neonvm/bin/set-disk-quota {size_kb} {mountpoint}`
let child_result = std::process::Command::new("/usr/bin/sudo")
.arg(DISK_QUOTA_BIN)
.arg(size_kb.to_string())
.arg(fs_mountpoint)
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => Err(anyhow::anyhow!("process exited with {status}")),
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| format!("could not run `/usr/bin/sudo {DISK_QUOTA_BIN}`"))
}

View File

@@ -10,7 +10,6 @@ pub mod http;
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod disk_quota;
pub mod extension_server;
pub mod lsn_lease;
mod migration;

View File

@@ -57,10 +57,10 @@ fn lsn_lease_bg_task(
.max(valid_duration / 2);
info!(
"Request succeeded, sleeping for {} seconds",
"Succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
thread::sleep(sleep_duration);
}
}
@@ -89,7 +89,10 @@ fn acquire_lsn_lease_with_retry(
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
@@ -105,11 +108,9 @@ fn acquire_lsn_lease_with_retry(
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
}
Err(e) => {
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
warn!("Failed to acquire lsn lease: {e} (attempt {attempts}");
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
retry_period_ms as u64,
));
thread::sleep(Duration::from_millis(retry_period_ms as u64));
retry_period_ms *= 1.5;
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
}

View File

@@ -1 +0,0 @@
GRANT EXECUTE ON FUNCTION pg_show_replication_origin_status TO neon_superuser;

View File

@@ -793,9 +793,6 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
),
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
),
];
MigrationRunner::new(client, &migrations).run_migrations()?;

View File

@@ -10,9 +10,12 @@ camino.workspace = true
clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
git-version.workspace = true
humantime.workspace = true
nix.workspace = true
once_cell.workspace = true
postgres.workspace = true
hex.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
regex.workspace = true
@@ -20,6 +23,8 @@ reqwest = { workspace = true, features = ["blocking", "json"] }
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tar.workspace = true
thiserror.workspace = true
toml.workspace = true
toml_edit.workspace = true

View File

@@ -151,7 +151,7 @@ where
print!(".");
io::stdout().flush().unwrap();
}
tokio::time::sleep(RETRY_INTERVAL).await;
thread::sleep(RETRY_INTERVAL);
}
Err(e) => {
println!("error starting process {process_name:?}: {e:#}");

File diff suppressed because it is too large Load Diff

View File

@@ -1,94 +0,0 @@
//! Branch mappings for convenience
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
/// Keep human-readable aliases in memory (and persist them to config XXX), to hide tenant/timeline hex strings from the user.
#[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct BranchMappings {
/// Default tenant ID to use with the 'neon_local' command line utility, when
/// --tenant_id is not explicitly specified. This comes from the branches.
pub default_tenant_id: Option<TenantId>,
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
pub mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}
impl BranchMappings {
pub fn register_branch_mapping(
&mut self,
branch_name: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let existing_values = self.mappings.entry(branch_name.clone()).or_default();
let existing_ids = existing_values
.iter()
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
if let Some((_, old_timeline_id)) = existing_ids {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));
Ok(())
}
}
pub fn get_branch_timeline_id(
&self,
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
// If it looks like a timeline ID, return it as it is
if let Ok(timeline_id) = branch_name.parse::<TimelineId>() {
return Some(timeline_id);
}
self.mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(TimelineId::from)
}
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
self.mappings
.iter()
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
})
})
.collect()
}
pub fn persist(&self, path: &Path) -> anyhow::Result<()> {
let content = &toml::to_string_pretty(self)?;
fs::write(path, content).with_context(|| {
format!(
"Failed to write branch information into path '{}'",
path.display()
)
})
}
pub fn load(path: &Path) -> anyhow::Result<BranchMappings> {
let branches_file_contents = fs::read_to_string(path)?;
Ok(toml::from_str(branches_file_contents.as_str())?)
}
}

View File

@@ -561,7 +561,6 @@ impl Endpoint {
operation_uuid: None,
features: self.features.clone(),
swap_size_bytes: None,
disk_quota_bytes: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
@@ -703,7 +702,7 @@ impl Endpoint {
}
}
}
tokio::time::sleep(ATTEMPT_INTERVAL).await;
std::thread::sleep(ATTEMPT_INTERVAL);
}
// disarm the scopeguard, let the child outlive this function (and neon_local invoction)

View File

@@ -17,7 +17,9 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, AuxFilePolicy, TenantInfo, TimelineInfo};
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
@@ -322,6 +324,22 @@ impl PageServerNode {
background_process::stop_process(immediate, "pageserver", &self.pid_file())
}
pub async fn page_server_psql_client(
&self,
) -> anyhow::Result<(
tokio_postgres::Client,
tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
)> {
let mut config = self.pg_connection_config.clone();
if self.conf.pg_auth_type == AuthType::NeonJWT {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
config = config.set_password(Some(token));
}
Ok(config.connect_no_tls().await?)
}
pub async fn check_status(&self) -> mgmt_api::Result<()> {
self.http_client.status().await
}
@@ -522,6 +540,19 @@ impl PageServerNode {
Ok(())
}
pub async fn location_config(
&self,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<Duration>,
lazy: bool,
) -> anyhow::Result<()> {
Ok(self
.http_client
.location_config(tenant_shard_id, config, flush_ms, lazy)
.await?)
}
pub async fn timeline_list(
&self,
tenant_shard_id: &TenantShardId,
@@ -605,4 +636,14 @@ impl PageServerNode {
Ok(())
}
pub async fn tenant_synthetic_size(
&self,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<TenantHistorySize> {
Ok(self
.http_client
.tenant_synthetic_size(tenant_shard_id)
.await?)
}
}

View File

@@ -4,10 +4,13 @@
/// NOTE: This doesn't implement the full, correct postgresql.conf syntax. Just
/// enough to extract a few settings we need in Neon, assuming you don't do
/// funny stuff like include-directives or funny escaping.
use anyhow::{bail, Context, Result};
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
use std::io::BufRead;
use std::str::FromStr;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]
@@ -16,16 +19,84 @@ pub struct PostgresConf {
hash: HashMap<String, String>,
}
static CONF_LINE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^((?:\w|\.)+)\s*=\s*(\S+)$").unwrap());
impl PostgresConf {
pub fn new() -> PostgresConf {
PostgresConf::default()
}
/// Read file into memory
pub fn read(read: impl std::io::Read) -> Result<PostgresConf> {
let mut result = Self::new();
for line in std::io::BufReader::new(read).lines() {
let line = line?;
// Store each line in a vector, in original format
result.lines.push(line.clone());
// Also parse each line and insert key=value lines into a hash map.
//
// FIXME: This doesn't match exactly the flex/bison grammar in PostgreSQL.
// But it's close enough for our usage.
let line = line.trim();
if line.starts_with('#') {
// comment, ignore
continue;
} else if let Some(caps) = CONF_LINE_RE.captures(line) {
let name = caps.get(1).unwrap().as_str();
let raw_val = caps.get(2).unwrap().as_str();
if let Ok(val) = deescape_str(raw_val) {
// Note: if there's already an entry in the hash map for
// this key, this will replace it. That's the behavior what
// we want; when PostgreSQL reads the file, each line
// overrides any previous value for the same setting.
result.hash.insert(name.to_string(), val.to_string());
}
}
}
Ok(result)
}
/// Return the current value of 'option'
pub fn get(&self, option: &str) -> Option<&str> {
self.hash.get(option).map(|x| x.as_ref())
}
/// Return the current value of a field, parsed to the right datatype.
///
/// This calls the FromStr::parse() function on the value of the field. If
/// the field does not exist, or parsing fails, returns an error.
///
pub fn parse_field<T>(&self, field_name: &str, context: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: std::error::Error + Send + Sync + 'static,
{
self.get(field_name)
.with_context(|| format!("could not find '{}' option {}", field_name, context))?
.parse::<T>()
.with_context(|| format!("could not parse '{}' option {}", field_name, context))
}
pub fn parse_field_optional<T>(&self, field_name: &str, context: &str) -> Result<Option<T>>
where
T: FromStr,
<T as FromStr>::Err: std::error::Error + Send + Sync + 'static,
{
if let Some(val) = self.get(field_name) {
let result = val
.parse::<T>()
.with_context(|| format!("could not parse '{}' option {}", field_name, context))?;
Ok(Some(result))
} else {
Ok(None)
}
}
///
/// Note: if you call this multiple times for the same option, the config
/// file will a line for each call. It would be nice to have a function
@@ -83,8 +154,48 @@ fn escape_str(s: &str) -> String {
}
}
/// De-escape a possibly-quoted value.
///
/// See `DeescapeQuotedString` function in PostgreSQL sources for how PostgreSQL
/// does this.
fn deescape_str(s: &str) -> Result<String> {
// If the string has a quote at the beginning and end, strip them out.
if s.len() >= 2 && s.starts_with('\'') && s.ends_with('\'') {
let mut result = String::new();
let mut iter = s[1..(s.len() - 1)].chars().peekable();
while let Some(c) = iter.next() {
let newc = if c == '\\' {
match iter.next() {
Some('b') => '\x08',
Some('f') => '\x0c',
Some('n') => '\n',
Some('r') => '\r',
Some('t') => '\t',
Some('0'..='7') => {
// TODO
bail!("octal escapes not supported");
}
Some(n) => n,
None => break,
}
} else if c == '\'' && iter.peek() == Some(&'\'') {
// doubled quote becomes just one quote
iter.next().unwrap()
} else {
c
};
result.push(newc);
}
Ok(result)
} else {
Ok(s.to_string())
}
}
#[test]
fn test_postgresql_conf_escapes() -> anyhow::Result<()> {
fn test_postgresql_conf_escapes() -> Result<()> {
assert_eq!(escape_str("foo bar"), "'foo bar'");
// these don't need to be quoted
assert_eq!(escape_str("foo"), "foo");
@@ -103,5 +214,13 @@ fn test_postgresql_conf_escapes() -> anyhow::Result<()> {
assert_eq!(escape_str("fo\\o"), "'fo\\\\o'");
assert_eq!(escape_str("10 cats"), "'10 cats'");
// Test de-escaping
assert_eq!(deescape_str(&escape_str("foo"))?, "foo");
assert_eq!(deescape_str(&escape_str("fo'o\nba\\r"))?, "fo'o\nba\\r");
assert_eq!(deescape_str("'\\b\\f\\n\\r\\t'")?, "\x08\x0c\n\r\t");
// octal-escapes are currently not supported
assert!(deescape_str("'foo\\7\\07\\007'").is_err());
Ok(())
}

View File

@@ -113,7 +113,7 @@ impl SafekeeperNode {
pub async fn start(
&self,
extra_opts: &[String],
extra_opts: Vec<String>,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
print!(
@@ -196,7 +196,7 @@ impl SafekeeperNode {
]);
}
args.extend_from_slice(extra_opts);
args.extend(extra_opts);
background_process::start_process(
&format!("safekeeper-{id}"),

View File

@@ -346,14 +346,7 @@ impl StorageController {
let pg_log_path = pg_data_path.join("postgres.log");
if !tokio::fs::try_exists(&pg_data_path).await? {
let initdb_args = [
"--pgdata",
pg_data_path.as_ref(),
"--username",
&username(),
"--no-sync",
"--no-instructions",
];
let initdb_args = ["-D", pg_data_path.as_ref(), "--username", &username()];
tracing::info!(
"Initializing storage controller database with args: {:?}",
initdb_args

View File

@@ -11,11 +11,14 @@ clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
humantime.workspace = true
hyper.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
utils.workspace = true

View File

@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
availability_zone_id,
}),
)
.await?;

View File

@@ -2,8 +2,8 @@
# Example docker compose configuration
The configuration in this directory is used for testing Neon docker images: it is
not intended for deploying a usable system. To run a development environment where
you can experiment with a miniature Neon system, use `cargo neon` rather than container images.
not intended for deploying a usable system. To run a development environment where
you can experiment with a minature Neon system, use `cargo neon` rather than container images.
This configuration does not start the storage controller, because the controller
needs a way to reconfigure running computes, and no such thing exists in this setup.

View File

@@ -1,343 +0,0 @@
# Independent compute release
Created at: 2024-08-30. Author: Alexey Kondratov (@ololobus)
## Summary
This document proposes an approach to fully independent compute release flow. It attempts to
cover the following features:
- Process is automated as much as possible to minimize human errors.
- Compute<->storage protocol compatibility is ensured.
- A transparent release history is available with an easy rollback strategy.
- Although not in the scope of this document, there is a viable way to extend the proposed release
flow to achieve the canary and/or blue-green deployment strategies.
## Motivation
Previously, the compute release was tightly coupled to the storage release. This meant that once
some storage nodes got restarted with a newer version, all new compute starts using these nodes
automatically got a new version. Thus, two releases happen in parallel, which increases the blast
radius and makes ownership fuzzy.
Now, we practice a manual v0 independent compute release flow -- after getting a new compute release
image and tag, we pin it region by region using Admin UI. It's better, but it still has its own flaws:
1. It's a simple but fairly manual process, as you need to click through a few pages.
2. It's prone to human errors, e.g., you could mistype or copy the wrong compute tag.
3. We now require an additional approval in the Admin UI, which partially solves the 2.,
but also makes the whole process pretty annoying, as you constantly need to go back
and forth between two people.
## Non-goals
It's not the goal of this document to propose a design for some general-purpose release tool like Helm.
The document considers how the current compute fleet is orchestrated at Neon. Even if we later
decide to split the control plane further (e.g., introduce a separate compute controller), the proposed
release process shouldn't change much, i.e., the releases table and API will reside in
one of the parts.
Achieving the canary and/or blue-green deploy strategies is out of the scope of this document. They
were kept in mind, though, so it's expected that the proposed approach will lay down the foundation
for implementing them in future iterations.
## Impacted components
Compute, control plane, CI, observability (some Grafana dashboards may require changes).
## Prior art
One of the very close examples is how Helm tracks [releases history](https://helm.sh/docs/helm/helm_history/).
In the code:
- [Release](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/release.go#L20-L43)
- [Release info](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/info.go#L24-L40)
- [Release status](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/status.go#L18-L42)
TL;DR it has several important attributes:
- Revision -- unique release ID/primary key. It is not the same as the application version,
because the same version can be deployed several times, e.g., after a newer version rollback.
- App version -- version of the application chart/code.
- Config -- set of overrides to the default config of the application.
- Status -- current status of the release in the history.
- Timestamps -- tracks when a release was created and deployed.
## Proposed implementation
### Separate release branch
We will use a separate release branch, `release-compute`, to have a clean history for releases and commits.
In order to avoid confusion with storage releases, we will use a different prefix for compute [git release
tags](https://github.com/neondatabase/neon/releases) -- `release-compute-XXXX`. We will use the same tag for
Docker images as well. The `neondatabase/compute-node-v16:release-compute-XXXX` looks longer and a bit redundant,
but it's better to have image and git tags in sync.
Currently, control plane relies on the numeric compute and storage release versions to decide on compute->storage
compatibility. Once we implement this proposal, we should drop this code as release numbers will be completely
independent. The only constraint we want is that it must monotonically increase within the same release branch.
### Compute config/settings manifest
We will create a new sub-directory `compute` and file `compute/manifest.yaml` with a structure:
```yaml
pg_settings:
# Common settings for primaries and secondaries of all versions.
common:
wal_log_hints: "off"
max_wal_size: "1024"
per_version:
14:
# Common settings for both replica and primary of version PG 14
common:
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
15:
common:
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
# Settings that should be applied only to
replica:
# Available only starting Postgres 15th
recovery_prefetch: "off"
# ...
17:
common:
# For example, if third-party `extension_x` is not yet available for PG 17
shared_preload_libraries: "neon,pg_stat_statements"
replica:
recovery_prefetch: "off"
```
**N.B.** Setting value should be a string with `on|off` for booleans and a number (as a string)
without units for all numeric settings. That's how the control plane currently operates.
The priority of settings will be (a higher number is a higher priority):
1. Any static and hard-coded settings in the control plane
2. `pg_settings->common`
3. Per-version `common`
4. Per-version `replica`
5. Any per-user/project/endpoint overrides in the control plane
6. Any dynamic setting calculated based on the compute size
**N.B.** For simplicity, we do not do any custom logic for `shared_preload_libraries`, so it's completely
overridden if specified on some level. Make sure that you include all necessary extensions in it when you
do any overrides.
**N.B.** There is a tricky question about what to do with custom compute image pinning we sometimes
do for particular projects and customers. That's usually some ad-hoc work and images are based on
the latest compute image, so it's relatively safe to assume that we could use settings from the latest compute
release. If for some reason that's not true, and further overrides are needed, it's also possible to do
on the project level together with pinning the image, so it's on-call/engineer/support responsibility to
ensure that compute starts with the specified custom image. The only real risk is that compute image will get
stale and settings from new releases will drift away, so eventually it will get something incompatible,
but i) this is some operational issue, as we do not want stale images anyway, and ii) base settings
receive something really new so rarely that the chance of this happening is very low. If we want to solve it completely,
then together with pinning the image we could also pin the matching release revision in the control plane.
The compute team will own the content of `compute/manifest.yaml`.
### Control plane: releases table
In order to store information about releases, the control plane will use a table `compute_releases` with the following
schema:
```sql
CREATE TABLE compute_releases (
-- Unique release ID
-- N.B. Revision won't by synchronized across all regions, because all control planes are technically independent
-- services. We have the same situation with Helm releases as well because they could be deployed and rolled back
-- independently in different clusters.
revision BIGSERIAL PRIMARY KEY,
-- Numeric version of the compute image, e.g. 9057
version BIGINT NOT NULL,
-- Compute image tag, e.g. `release-9057`
tag TEXT NOT NULL,
-- Current release status. Currently, it will be a simple enum
-- * `deployed` -- release is deployed and used for new compute starts.
-- Exactly one release can have this status at a time.
-- * `superseded` -- release has been replaced by a newer one.
-- But we can always extend it in the future when we need more statuses
-- for more complex deployment strategies.
status TEXT NOT NULL,
-- Any additional metadata for compute in the corresponding release
manifest JSONB NOT NULL,
-- Timestamp when release record was created in the control plane database
created_at TIMESTAMP NOT NULL DEFAULT now(),
-- Timestamp when release deployment was finished
deployed_at TIMESTAMP
);
```
We keep track of the old releases not only for the sake of audit, but also because we usually have ~30% of
old computes started using the image from one of the previous releases. Yet, when users want to reconfigure
them without restarting, the control plane needs to know what settings are applicable to them, so we also need
information about the previous releases that are readily available. There could be some other auxiliary info
needed as well: supported extensions, compute flags, etc.
**N.B.** Here, we can end up in an ambiguous situation when the same compute image is deployed twice, e.g.,
it was deployed once, then rolled back, and then deployed again, potentially with a different manifest. Yet,
we could've started some computes with the first deployment and some with the second. Thus, when we need to
look up the manifest for the compute by its image tag, we will see two records in the table with the same tag,
but different revision numbers. We can assume that this could happen only in case of rollbacks, so we
can just take the latest revision for the given tag.
### Control plane: management API
The control plane will implement new API methods to manage releases:
1. `POST /management/api/v2/compute_releases` to create a new release. With payload
```json
{
"version": 9057,
"tag": "release-9057",
"manifest": {}
}
```
and response
```json
{
"revision": 53,
"version": 9057,
"tag": "release-9057",
"status": "deployed",
"manifest": {},
"created_at": "2024-08-15T15:52:01.0000Z",
"deployed_at": "2024-08-15T15:52:01.0000Z",
}
```
Here, we can actually mix-in custom (remote) extensions metadata into the `manifest`, so that the control plane
will get information about all available extensions not bundled into compute image. The corresponding
workflow in `neondatabase/build-custom-extensions` should produce it as an artifact and make
it accessible to the workflow in the `neondatabase/infra`. See the complete release flow below. Doing that,
we put a constraint that new custom extension requires new compute release, which is good for the safety,
but is not exactly what we want operational-wise (we want to be able to deploy new extensions without new
images). Yet, it can be solved incrementally: v0 -- do not do anything with extensions at all;
v1 -- put them into the same manifest; v2 -- make them separate entities with their own lifecycle.
**N.B.** This method is intended to be used in CI workflows, and CI/network can be flaky. It's reasonable
to assume that we could retry the request several times, even though it's already succeeded. Although it's
not a big deal to create several identical releases one-by-one, it's better to avoid it, so the control plane
should check if the latest release is identical and just return `304 Not Modified` in this case.
2. `POST /management/api/v2/compute_releases/rollback` to rollback to any previously deployed release. With payload
including the revision of the release to rollback to:
```json
{
"revision": 52
}
```
Rollback marks the current release as `superseded` and creates a new release with all the same data as the
requested revision, but with a new revision number.
This rollback API is not strictly needed, as we can just use `infra` repo workflow to deploy any
available tag. It's still nice to have for on-call and any urgent matters, for example, if we need
to rollback and GitHub is down. It's much easier to specify only the revision number vs. crafting
all the necessary data for the new release payload.
### Compute->storage compatibility tests
In order to safely release new compute versions independently from storage, we need to ensure that the currently
deployed storage is compatible with the new compute version. Currently, we maintain backward compatibility
in storage, but newer computes may require a newer storage version.
Remote end-to-end (e2e) tests [already accept](https://github.com/neondatabase/cloud/blob/e3468d433e0d73d02b7d7e738d027f509b522408/.github/workflows/testing.yml#L43-L48)
`storage_image_tag` and `compute_image_tag` as separate inputs. That means that we could reuse e2e tests to ensure
compatibility between storage and compute:
1. Pick the latest storage release tag and use it as `storage_image_tag`.
2. Pick a new compute tag built in the current compute release PR and use it as `compute_image_tag`.
Here, we should use a temporary ECR image tag, because the final tag will be known only after the release PR is merged.
3. Trigger e2e tests as usual.
### Release flow
```mermaid
sequenceDiagram
actor oncall as Compute on-call person
participant neon as neondatabase/neon
box private
participant cloud as neondatabase/cloud
participant exts as neondatabase/build-custom-extensions
participant infra as neondatabase/infra
end
box cloud
participant preprod as Pre-prod control plane
participant prod as Production control plane
participant k8s as Compute k8s
end
oncall ->> neon: Open release PR into release-compute
activate neon
neon ->> cloud: CI: trigger e2e compatibility tests
activate cloud
cloud -->> neon: CI: e2e tests pass
deactivate cloud
neon ->> neon: CI: pass PR checks, get approvals
deactivate neon
oncall ->> neon: Merge release PR into release-compute
activate neon
neon ->> neon: CI: pass checks, build and push images
neon ->> exts: CI: trigger extensions build
activate exts
exts -->> neon: CI: extensions are ready
deactivate exts
neon ->> neon: CI: create release tag
neon ->> infra: Trigger release workflow using the produced tag
deactivate neon
activate infra
infra ->> infra: CI: pass checks
infra ->> preprod: Release new compute image to pre-prod automatically <br/> POST /management/api/v2/compute_releases
activate preprod
preprod -->> infra: 200 OK
deactivate preprod
infra ->> infra: CI: wait for per-region production deploy approvals
oncall ->> infra: CI: approve deploys region by region
infra ->> k8s: Prewarm new compute image
infra ->> prod: POST /management/api/v2/compute_releases
activate prod
prod -->> infra: 200 OK
deactivate prod
deactivate infra
```
## Further work
As briefly mentioned in other sections, eventually, we would like to use more complex deployment strategies.
For example, we can pass a fraction of the total compute starts that should use the new release. Then we can
mark the release as `partial` or `canary` and monitor its performance. If everything is fine, we can promote it
to `deployed` status. If not, we can roll back to the previous one.
## Alternatives
In theory, we can try using Helm as-is:
1. Write a compute Helm chart. That will actually have only some config map, which the control plane can access and read.
N.B. We could reuse the control plane chart as well, but then it's not a fully independent release again and even more fuzzy.
2. The control plane will read it and start using the new compute version for new starts.
Drawbacks:
1. Helm releases work best if the workload is controlled by the Helm chart itself. Then you can have different
deployment strategies like rolling update or canary or blue/green deployments. At Neon, the compute starts are controlled
by control plane, so it makes it much more tricky.
2. Releases visibility will suffer, i.e. instead of a nice table in the control plane and Admin UI, we would need to use
`helm` cli and/or K8s UIs like K8sLens.
3. We do not restart all computes shortly after the new version release. This means that for some features and compatibility
purpose (see above) control plane may need some auxiliary info from the previous releases.

View File

@@ -8,6 +8,7 @@ license.workspace = true
anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
regex.workspace = true

View File

@@ -50,16 +50,6 @@ pub struct ComputeSpec {
#[serde(default)]
pub swap_size_bytes: Option<u64>,
/// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
/// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
/// spec is first received.
///
/// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
/// spec generation doesn't need to be aware of the actual compute it's running on, while
/// guaranteeing gradual rollout of disk quota.
#[serde(default)]
pub disk_quota_bytes: Option<u64>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
@@ -278,22 +268,6 @@ pub struct GenericOption {
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
/// Configured the local-proxy application with the relevant JWKS and roles it should
/// use for authorizing connect requests using JWT.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LocalProxySpec {
pub jwks: Vec<JwksSettings>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JwksSettings {
pub id: String,
pub role_names: Vec<String>,
pub jwks_url: String,
pub provider_name: String,
pub jwt_audience: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -5,6 +5,9 @@ edition = "2021"
license = "Apache-2.0"
[dependencies]
anyhow.workspace = true
chrono = { workspace = true, features = ["serde"] }
rand.workspace = true
serde.workspace = true
serde_with.workspace = true
utils.workspace = true

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
@@ -107,7 +107,7 @@ pub const CHUNK_SIZE: usize = 1000;
// Just a wrapper around a slice of events
// to serialize it as `{"events" : [ ] }
#[derive(serde::Serialize, Deserialize)]
#[derive(serde::Serialize, serde::Deserialize)]
pub struct EventChunk<'a, T: Clone> {
pub events: std::borrow::Cow<'a, [T]>,
}

View File

@@ -12,4 +12,5 @@ bytes.workspace = true
utils.workspace = true
parking_lot.workspace = true
hex.workspace = true
scopeguard.workspace = true
smallvec = { workspace = true, features = ["write"] }

View File

@@ -104,6 +104,9 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
#[serde(skip_serializing)]
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -170,6 +173,40 @@ impl Default for EvictionOrder {
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
@@ -301,6 +338,8 @@ pub mod defaults {
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Zstd { level: Some(1) };
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
@@ -337,10 +376,7 @@ impl Default for ConfigToml {
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
.expect("Invalid default constant")),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(
DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES,
)
.unwrap(),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
metric_collection_interval: (humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
@@ -381,6 +417,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
compact_level0_phase1_value_access: Default::default(),
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
@@ -430,6 +467,8 @@ pub mod tenant_conf_defaults {
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
impl Default for TenantConfigToml {

View File

@@ -1,5 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -58,7 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub availability_zone_id: AvailabilityZone,
pub availability_zone_id: String,
}
#[derive(Serialize, Deserialize)]
@@ -75,19 +74,10 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AvailabilityZone(pub String);
impl Display for AvailabilityZone {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]

View File

@@ -37,11 +37,14 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
@@ -65,6 +68,10 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Loading,
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
@@ -114,6 +121,8 @@ impl TenantState {
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -182,11 +191,10 @@ impl LsnLease {
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
///
/// XXX: We used to have more variants here, but now it's just one, which makes this rather
/// useless. Remove, once we've checked that there's no client code left that looks at this.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
Loading,
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
Attaching,
}
@@ -487,7 +495,7 @@ pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
@@ -1554,8 +1562,11 @@ mod tests {
#[test]
fn tenantstatus_activating_serde() {
let states = [TenantState::Activating(ActivatingFrom::Attaching)];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let states = [
TenantState::Activating(ActivatingFrom::Loading),
TenantState::Activating(ActivatingFrom::Attaching),
];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let actual = serde_json::to_string(&states).unwrap();
@@ -1570,7 +1581,13 @@ mod tests {
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(
line!(),
TenantState::Activating(ActivatingFrom::Loading),
"Activating",
),
(
line!(),
TenantState::Activating(ActivatingFrom::Attaching),

View File

@@ -5,8 +5,10 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
rustls.workspace = true
serde.workspace = true
thiserror.workspace = true

View File

@@ -280,6 +280,16 @@ pub struct PostgresBackend<IO> {
pub type PostgresBackendTCP = PostgresBackend<tokio::net::TcpStream>;
pub fn query_from_cstring(query_string: Bytes) -> Vec<u8> {
let mut query_string = query_string.to_vec();
if let Some(ch) = query_string.last() {
if *ch == 0 {
query_string.pop();
}
}
query_string
}
/// Cast a byte slice to a string slice, dropping null terminator if there's one.
fn cstr_to_str(bytes: &[u8]) -> anyhow::Result<&str> {
let without_null = bytes.strip_suffix(&[0]).unwrap_or(bytes);
@@ -984,7 +994,6 @@ pub fn short_error(e: &QueryError) -> String {
}
fn log_query_error(query: &str, e: &QueryError) {
// If you want to change the log level of a specific error, also re-categorize it in `BasebackupQueryTimeOngoingRecording`.
match e {
QueryError::Disconnected(ConnectionError::Io(io_error)) => {
if is_expected_io_error(io_error) {

View File

@@ -5,10 +5,13 @@ edition.workspace = true
license.workspace = true
[dependencies]
rand.workspace = true
regex.workspace = true
bytes.workspace = true
byteorder.workspace = true
anyhow.workspace = true
crc32c.workspace = true
hex.workspace = true
once_cell.workspace = true
log.workspace = true
memoffset.workspace = true

View File

@@ -9,8 +9,8 @@
//! comments on them.
//!
use crate::PageHeaderData;
use crate::BLCKSZ;
use crate::{PageHeaderData, XLogRecord};
//
// From pg_tablespace_d.h
@@ -194,6 +194,8 @@ pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = size_of::<XLogRecord>() as u32;
//
// from xlogrecord.h
//
@@ -217,6 +219,8 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
/* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;
pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* pg_control.h */
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;

View File

@@ -26,12 +26,11 @@ use bytes::{Buf, Bytes};
use log::*;
use serde::Serialize;
use std::ffi::OsStr;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
use std::io::SeekFrom;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use utils::bin_ser::DeserializeError;
use utils::bin_ser::SerializeError;
@@ -79,34 +78,19 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
} else {
anyhow::bail!("non-ut8 filename: {:?}", fname);
}
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
pub fn IsXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
} else {
false
}
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
} else {
false
}
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}
/// If LSN points to the beginning of the page, then shift it to first record,
@@ -276,6 +260,13 @@ fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
}
}
pub fn main() {
let mut data_dir = PathBuf::new();
data_dir.push(".");
let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
println!("wal_end={:?}", wal_end);
}
impl XLogRecord {
pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;

View File

@@ -9,6 +9,7 @@ anyhow.workspace = true
clap.workspace = true
env_logger.workspace = true
log.workspace = true
once_cell.workspace = true
postgres.workspace = true
postgres_ffi.workspace = true
camino-tempfile.workspace = true

View File

@@ -7,7 +7,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
@@ -27,6 +26,7 @@ macro_rules! xlog_utils_test {
postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Conf {
pub pg_version: u32,
pub pg_distrib_dir: PathBuf,
@@ -93,9 +93,9 @@ impl Conf {
);
let output = self
.new_pg_command("initdb")?
.arg("--pgdata")
.arg("-D")
.arg(&self.datadir)
.args(["--username", "postgres", "--no-instructions", "--no-sync"])
.args(["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);
ensure!(
@@ -136,8 +136,8 @@ impl Conf {
pub fn pg_waldump(
&self,
first_segment_name: &OsStr,
last_segment_name: &OsStr,
first_segment_name: &str,
last_segment_name: &str,
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);

View File

@@ -4,7 +4,6 @@ use super::*;
use crate::{error, info};
use regex::Regex;
use std::cmp::min;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::Write;
use std::{env, str::FromStr};
@@ -55,7 +54,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name())
.map(|f| f.unwrap().file_name().into_string().unwrap())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
@@ -71,11 +70,11 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name();
let fname = file.file_name().into_string().unwrap();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
@@ -94,10 +93,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
}
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
.pg_waldump("000000010000000000000001", last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
@@ -118,7 +117,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
fn check_end_of_wal(
cfg: &crate::Conf,
last_segment: &OsStr,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
@@ -133,8 +132,7 @@ fn check_end_of_wal(
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
@@ -144,8 +142,7 @@ fn check_end_of_wal(
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
)
.unwrap();

View File

@@ -8,8 +8,10 @@ license.workspace = true
bytes.workspace = true
byteorder.workspace = true
itertools.workspace = true
pin-project-lite.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
tokio = { workspace = true, features = ["io-util"] }
tracing.workspace = true
thiserror.workspace = true
serde.workspace = true

View File

@@ -13,11 +13,14 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime.workspace = true
humantime-serde.workspace = true
hyper = { workspace = true, features = ["stream"] }
futures.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }

View File

@@ -127,6 +127,10 @@ impl RemotePath {
&self.0
}
pub fn extension(&self) -> Option<&str> {
self.0.extension()
}
pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
self.0.strip_prefix(&p.0)
}

View File

@@ -6,5 +6,6 @@ license.workspace = true
[dependencies]
serde.workspace = true
serde_with.workspace = true
const_format.workspace = true
utils.workspace = true

View File

@@ -9,9 +9,8 @@ hyper.workspace = true
opentelemetry = { workspace = true, features=["rt-tokio"] }
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions.workspace = true
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
[dev-dependencies]
tracing-subscriber.workspace = true # For examples in docs
tracing-subscriber.workspace = true

View File

@@ -19,7 +19,6 @@ bincode.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
hyper = { workspace = true, features = ["full"] }
@@ -43,6 +42,7 @@ tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
rand.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true

33
libs/utils/src/accum.rs Normal file
View File

@@ -0,0 +1,33 @@
/// A helper to "accumulate" a value similar to `Iterator::reduce`, but lets you
/// feed the accumulated values by calling the 'accum' function, instead of having an
/// iterator.
///
/// For example, to calculate the smallest value among some integers:
///
/// ```
/// use utils::accum::Accum;
///
/// let values = [1, 2, 3];
///
/// let mut min_value: Accum<u32> = Accum(None);
/// for new_value in &values {
/// min_value.accum(std::cmp::min, *new_value);
/// }
///
/// assert_eq!(min_value.0.unwrap(), 1);
/// ```
pub struct Accum<T>(pub Option<T>);
impl<T: Copy> Accum<T> {
pub fn accum<F>(&mut self, func: F, new_value: T)
where
F: FnOnce(T, T) -> T,
{
// If there is no previous value, just store the new value.
// Otherwise call the function to decide which one to keep.
self.0 = Some(if let Some(accum) = self.0 {
func(accum, new_value)
} else {
new_value
});
}
}

View File

@@ -82,7 +82,7 @@ impl ApiError {
StatusCode::INTERNAL_SERVER_ERROR,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#}"), // use alternative formatting so that we give the cause without backtrace
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
),
}

View File

@@ -88,6 +88,12 @@ impl<'de> Deserialize<'de> for Id {
}
impl Id {
pub fn get_from_buf(buf: &mut impl bytes::Buf) -> Id {
let mut arr = [0u8; 16];
buf.copy_to_slice(&mut arr);
Id::from(arr)
}
pub fn from_slice(src: &[u8]) -> Result<Id, IdError> {
if src.len() != 16 {
return Err(IdError::SliceParseError(src.len()));
@@ -173,6 +179,10 @@ impl fmt::Debug for Id {
macro_rules! id_newtype {
($t:ident) => {
impl $t {
pub fn get_from_buf(buf: &mut impl bytes::Buf) -> $t {
$t(Id::get_from_buf(buf))
}
pub fn from_slice(src: &[u8]) -> Result<$t, IdError> {
Ok($t(Id::from_slice(src)?))
}

View File

@@ -21,13 +21,7 @@
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
time::Duration,
};
use std::{sync::Mutex, time::Duration};
use tokio::{sync::Notify, time::Instant};
@@ -134,7 +128,6 @@ impl LeakyBucketState {
pub struct RateLimiter {
pub config: LeakyBucketConfig,
pub sleep_counter: AtomicU64,
pub state: Mutex<LeakyBucketState>,
/// a queue to provide this fair ordering.
pub queue: Notify,
@@ -151,7 +144,6 @@ impl Drop for Requeue<'_> {
impl RateLimiter {
pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
RateLimiter {
sleep_counter: AtomicU64::new(0),
state: Mutex::new(LeakyBucketState::with_initial_tokens(
&config,
initial_tokens,
@@ -171,16 +163,15 @@ impl RateLimiter {
/// returns true if we did throttle
pub async fn acquire(&self, count: usize) -> bool {
let start = tokio::time::Instant::now();
let mut throttled = false;
let start_count = self.sleep_counter.load(Ordering::Acquire);
let mut end_count = start_count;
let start = tokio::time::Instant::now();
// wait until we are the first in the queue
let mut notified = std::pin::pin!(self.queue.notified());
if !notified.as_mut().enable() {
throttled = true;
notified.await;
end_count = self.sleep_counter.load(Ordering::Acquire);
}
// notify the next waiter in the queue when we are done.
@@ -193,22 +184,9 @@ impl RateLimiter {
.unwrap()
.add_tokens(&self.config, start, count as f64);
match res {
Ok(()) => return end_count > start_count,
Ok(()) => return throttled,
Err(ready_at) => {
struct Increment<'a>(&'a AtomicU64);
impl Drop for Increment<'_> {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::AcqRel);
}
}
// increment the counter after we finish sleeping (or cancel this task).
// this ensures that tasks that have already started the acquire will observe
// the new sleep count when they are allowed to resume on the notify.
let _inc = Increment(&self.sleep_counter);
end_count += 1;
throttled = true;
tokio::time::sleep_until(ready_at).await;
}
}

View File

@@ -43,9 +43,16 @@ pub mod logging;
pub mod lock_file;
pub mod pid_file;
// Misc
pub mod accum;
pub mod shutdown;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;
// Utility for putting a raw file descriptor into non-blocking mode
pub mod nonblock;
// Default signal handling
pub mod sentry_init;
pub mod signals;
@@ -92,10 +99,6 @@ pub mod toml_edit_ext;
pub mod circuit_breaker;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
@@ -135,7 +138,7 @@ macro_rules! project_git_version {
($const_identifier:ident) => {
// this should try GIT_VERSION first only then git_version::git_version!
const $const_identifier: &::core::primitive::str = {
const __COMMIT_FROM_GIT: &::core::primitive::str = $crate::git_version::git_version! {
const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! {
prefix = "",
fallback = "unknown",
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha

View File

@@ -1,5 +1,6 @@
#![warn(missing_docs)]
use camino::Utf8Path;
use serde::{de::Visitor, Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
@@ -144,6 +145,14 @@ impl Lsn {
i128::from(self.0) - i128::from(other)
}
/// Parse an LSN from a filename in the form `0000000000000000`
pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
where
F: AsRef<Utf8Path>,
{
Lsn::from_hex(filename.as_ref().as_str())
}
/// Parse an LSN from a string in the form `0000000000000000`
pub fn from_hex<S>(s: S) -> Result<Self, LsnParseError>
where

View File

@@ -0,0 +1,17 @@
use nix::fcntl::{fcntl, OFlag, F_GETFL, F_SETFL};
use std::os::unix::io::RawFd;
/// Put a file descriptor into non-blocking mode
pub fn set_nonblock(fd: RawFd) -> Result<(), std::io::Error> {
let bits = fcntl(fd, F_GETFL)?;
// If F_GETFL returns some unknown bits, they should be valid
// for passing back to F_SETFL, too. If we left them out, the F_SETFL
// would effectively clear them, which is not what we want.
let mut flags = OFlag::from_bits_retain(bits);
flags |= OFlag::O_NONBLOCK;
fcntl(fd, F_SETFL(flags))?;
Ok(())
}

View File

@@ -0,0 +1,7 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) -> ! {
// SAFETY: exiting is safe, the ffi is not safe
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -120,6 +120,32 @@ impl<K: Ord, V> VecMap<K, V> {
Ok((None, delta_size))
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).
/// Right map contains `cutoff` and everything after (inclusive).
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
where
K: Clone,
V: Clone,
{
let split_idx = self
.data
.binary_search_by_key(&cutoff, extract_key)
.unwrap_or_else(std::convert::identity);
(
VecMap {
data: self.data[..split_idx].to_vec(),
ordering: self.ordering,
},
VecMap {
data: self.data[split_idx..].to_vec(),
ordering: self.ordering,
},
)
}
/// Move items from `other` to the end of `self`, leaving `other` empty.
/// If the `other` ordering is different from `self` ordering
/// `ExtendOrderingError` error will be returned.

View File

@@ -15,11 +15,13 @@ anyhow.workspace = true
axum.workspace = true
clap.workspace = true
futures.workspace = true
inotify.workspace = true
serde.workspace = true
serde_json.workspace = true
sysinfo.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true

View File

@@ -15,6 +15,7 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true
@@ -22,11 +23,15 @@ camino.workspace = true
camino-tempfile.workspace = true
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
const_format.workspace = true
consumption_metrics.workspace = true
crc32c.workspace = true
crossbeam-utils.workspace = true
either.workspace = true
flate2.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
@@ -52,6 +57,10 @@ serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_path_to_error.workspace = true
serde_with.workspace = true
signal-hook.workspace = true
smallvec = { workspace = true, features = ["write"] }
svg_fmt.workspace = true
sync_wrapper.workspace = true
sysinfo.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
@@ -64,6 +73,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
twox-hash.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true

View File

@@ -1,7 +1,7 @@
//! Quantify a single walredo manager's throughput under N concurrent callers.
//!
//! The benchmark implementation ([`bench_impl`]) is parametrized by
//! - `redo_work` => an async closure that takes a `PostgresRedoManager` and performs one redo
//! - `redo_work` => [`Request::short_request`] or [`Request::medium_request`]
//! - `n_redos` => number of times the benchmark shell execute the `redo_work`
//! - `nclients` => number of clients (more on this shortly).
//!
@@ -10,7 +10,7 @@
//! Each task executes the `redo_work` `n_redos/nclients` times.
//!
//! We exercise the following combinations:
//! - `redo_work = ping / short / medium``
//! - `redo_work = short / medium``
//! - `nclients = [1, 2, 4, 8, 16, 32, 64, 128]`
//!
//! We let `criterion` determine the `n_redos` using `iter_custom`.
@@ -27,43 +27,33 @@
//!
//! # Reference Numbers
//!
//! 2024-09-18 on im4gn.2xlarge
//! 2024-04-15 on i3en.3xlarge
//!
//! ```text
//! ping/1 time: [21.789 µs 21.918 µs 22.078 µs]
//! ping/2 time: [27.686 µs 27.812 µs 27.970 µs]
//! ping/4 time: [35.468 µs 35.671 µs 35.926 µs]
//! ping/8 time: [59.682 µs 59.987 µs 60.363 µs]
//! ping/16 time: [101.79 µs 102.37 µs 103.08 µs]
//! ping/32 time: [184.18 µs 185.15 µs 186.36 µs]
//! ping/64 time: [349.86 µs 351.45 µs 353.47 µs]
//! ping/128 time: [684.53 µs 687.98 µs 692.17 µs]
//! short/1 time: [31.833 µs 32.126 µs 32.428 µs]
//! short/2 time: [35.558 µs 35.756 µs 35.992 µs]
//! short/4 time: [44.850 µs 45.138 µs 45.484 µs]
//! short/8 time: [65.985 µs 66.379 µs 66.853 µs]
//! short/16 time: [127.06 µs 127.90 µs 128.87 µs]
//! short/32 time: [252.98 µs 254.70 µs 256.73 µs]
//! short/64 time: [497.13 µs 499.86 µs 503.26 µs]
//! short/128 time: [987.46 µs 993.45 µs 1.0004 ms]
//! medium/1 time: [137.91 µs 138.55 µs 139.35 µs]
//! medium/2 time: [192.00 µs 192.91 µs 194.07 µs]
//! medium/4 time: [389.62 µs 391.55 µs 394.01 µs]
//! medium/8 time: [776.80 µs 780.33 µs 784.77 µs]
//! medium/16 time: [1.5323 ms 1.5383 ms 1.5459 ms]
//! medium/32 time: [3.0120 ms 3.0226 ms 3.0350 ms]
//! medium/64 time: [5.7405 ms 5.7787 ms 5.8166 ms]
//! medium/128 time: [10.412 ms 10.574 ms 10.718 ms]
//! short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! ```
use anyhow::Context;
use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
use pageserver_api::{key::Key, shard::TenantShardId};
use std::{
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@@ -71,59 +61,40 @@ use tokio::{sync::Barrier, task::JoinSet};
use utils::{id::TenantId, lsn::Lsn};
fn bench(c: &mut Criterion) {
macro_rules! bench_group {
($name:expr, $redo_work:expr) => {{
let name: &str = $name;
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(name);
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
b.iter_custom(|iters| bench_impl($redo_work, iters, *nclients));
},
);
}
}};
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("short");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("medium");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
}
//
// benchmark the protocol implementation
//
let pg_version = 14;
bench_group!(
"ping",
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
let _: () = mgr.ping(pg_version).await.unwrap();
})
);
//
// benchmarks with actual record redo
//
let make_redo_work = |req: &'static Request| {
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
let page = req.execute(&mgr).await.unwrap();
assert_eq!(page.remaining(), 8192);
})
};
bench_group!("short", {
static REQUEST: Lazy<Request> = Lazy::new(Request::short_input);
make_redo_work(&REQUEST)
});
bench_group!("medium", {
static REQUEST: Lazy<Request> = Lazy::new(Request::medium_input);
make_redo_work(&REQUEST)
});
}
criterion::criterion_group!(benches, bench);
criterion::criterion_main!(benches);
// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
fn bench_impl<F, Fut>(redo_work: Arc<F>, n_redos: u64, nclients: u64) -> Duration
where
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
@@ -164,20 +135,17 @@ where
})
}
async fn client<F, Fut>(
async fn client(
mgr: Arc<PostgresRedoManager>,
start: Arc<Barrier>,
redo_work: Arc<F>,
redo_work: Arc<Request>,
n_redos: u64,
) -> Duration
where
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
) -> Duration {
start.wait().await;
let start = Instant::now();
for _ in 0..n_redos {
redo_work(Arc::clone(&mgr)).await;
let page = redo_work.execute(&mgr).await.unwrap();
assert_eq!(page.remaining(), 8192);
// The real pageserver will rarely if ever do 2 walredos in a row without
// yielding to the executor.
tokio::task::yield_now().await;

View File

@@ -432,7 +432,7 @@ impl Client {
self.mgmt_api_endpoint
);
self.request(Method::PUT, &uri, req)
self.request(Method::POST, &uri, req)
.await?
.json()
.await
@@ -736,22 +736,4 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_init_lsn_lease(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<LsnLease> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
self.mgmt_api_endpoint,
);
self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -9,18 +9,41 @@ default = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
async-stream.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
const_format.workspace = true
consumption_metrics.workspace = true
crossbeam-utils.workspace = true
either.workspace = true
flate2.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true
pin-project-lite.workspace = true
rand.workspace = true
smallvec = { workspace = true, features = ["write"] }
svg_fmt.workspace = true
sync_wrapper.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-io-timeout.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -8,8 +8,10 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
bytes.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["string"] }
git-version.workspace = true
humantime.workspace = true
pageserver = { path = ".." }
pageserver_api.workspace = true
@@ -22,4 +24,5 @@ toml_edit.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -15,7 +15,7 @@ use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::config::PageserverIdentity;
use pageserver::controller_upcall_client::ControllerUpcallClient;
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
@@ -396,7 +396,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControllerUpcallClient::new(conf, &shutdown_pageserver),
ControlPlaneClient::new(conf, &shutdown_pageserver),
conf,
);
if let Some(deletion_workers) = deletion_workers {

View File

@@ -13,6 +13,7 @@ use pageserver_api::{
use remote_storage::{RemotePath, RemoteStorageConfig};
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::logging::SecretString;
use once_cell::sync::OnceCell;
@@ -32,7 +33,7 @@ use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use crate::virtual_file;
use crate::virtual_file::io_engine;
use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME};
use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX};
/// Global state of pageserver.
///
@@ -256,6 +257,17 @@ impl PageServerConf {
.join(timeline_id.to_string())
}
pub(crate) fn timeline_delete_mark_file_path(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_DELETE_MARK_SUFFIX,
)
}
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
remote_path.with_base(&self.workdir)
@@ -324,6 +336,7 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access: _,
l0_flush,
virtual_file_direct_io,
concurrent_tenant_warmup,
@@ -478,6 +491,11 @@ pub struct ConfigurableSemaphore {
}
impl ConfigurableSemaphore {
pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
Some(x) => x,
None => panic!("const unwrap is not yet stable"),
};
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
@@ -498,6 +516,12 @@ impl ConfigurableSemaphore {
}
}
impl Default for ConfigurableSemaphore {
fn default() -> Self {
Self::new(Self::DEFAULT_INITIAL)
}
}
impl PartialEq for ConfigurableSemaphore {
fn eq(&self, other: &Self) -> bool {
// the number of permits can be increased at runtime, so we cannot really fulfill the
@@ -534,6 +558,16 @@ mod tests {
.expect("parse_and_validate");
}
#[test]
fn test_compactl0_phase1_access_mode_is_ignored_silently() {
let input = indoc::indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
validate = "key-lsn-value"
"#};
toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input).unwrap();
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.

View File

@@ -178,7 +178,7 @@ async fn collect_metrics(
)
.await;
if let Err(e) = res {
tracing::error!("failed to upload to remote storage: {e:#}");
tracing::error!("failed to upload to S3: {e:#}");
}
}
};

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use futures::Future;
use pageserver_api::{
controller_api::{AvailabilityZone, NodeRegisterRequest},
controller_api::NodeRegisterRequest,
shard::TenantShardId,
upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
@@ -17,12 +17,9 @@ use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
use crate::{config::PageServerConf, virtual_file::on_fatal_io_error};
use pageserver_api::config::NodeMetadata;
/// The Pageserver's client for using the storage controller upcall API: this is a small API
/// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
///
/// The server presenting this API may either be the storage controller or some other
/// service (such as the Neon control plane) providing a store of generation numbers.
pub struct ControllerUpcallClient {
/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
pub struct ControlPlaneClient {
http_client: reqwest::Client,
base_url: Url,
node_id: NodeId,
@@ -48,7 +45,7 @@ pub trait ControlPlaneGenerationsApi {
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
}
impl ControllerUpcallClient {
impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
@@ -117,7 +114,7 @@ impl ControllerUpcallClient {
}
}
impl ControlPlaneGenerationsApi for ControllerUpcallClient {
impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn re_attach(
&self,
@@ -151,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
match az_id_from_metadata {
Some(az_id) => Some(AvailabilityZone(az_id)),
Some(az_id) => Some(az_id),
None => {
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
conf.availability_zone.clone().map(AvailabilityZone)
conf.availability_zone.clone()
}
}
};
@@ -219,38 +216,29 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
.join("validate")
.expect("Failed to build validate path");
// When sending validate requests, break them up into chunks so that we
// avoid possible edge cases of generating any HTTP requests that
// require database I/O across many thousands of tenants.
let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
for tenant_chunk in (tenants).chunks(128) {
let request = ValidateRequest {
tenants: tenant_chunk
.iter()
.map(|(id, generation)| ValidateRequestTenant {
id: *id,
gen: (*generation).into().expect(
"Generation should always be valid for a Tenant doing deletions",
),
})
.collect(),
};
let request = ValidateRequest {
tenants: tenants
.into_iter()
.map(|(id, gen)| ValidateRequestTenant {
id,
gen: gen
.into()
.expect("Generation should always be valid for a Tenant doing deletions"),
})
.collect(),
};
failpoint_support::sleep_millis_async!(
"control-plane-client-validate-sleep",
&self.cancel
);
if self.cancel.is_cancelled() {
return Err(RetryForeverError::ShuttingDown);
}
let response: ValidateResponse =
self.retry_http_forever(&re_attach_path, request).await?;
for rt in response.tenants {
result.insert(rt.id, rt.valid);
}
failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
if self.cancel.is_cancelled() {
return Err(RetryForeverError::ShuttingDown);
}
Ok(result.into_iter().collect())
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
Ok(response
.tenants
.into_iter()
.map(|rt| (rt.id, rt.valid))
.collect())
}
}

View File

@@ -6,7 +6,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
@@ -622,7 +622,7 @@ impl DeletionQueue {
/// If remote_storage is None, then the returned workers will also be None.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
controller_upcall_client: Option<C>,
control_plane_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, Option<DeletionQueueWorkers<C>>)
where
@@ -662,7 +662,7 @@ impl DeletionQueue {
conf,
backend_rx,
executor_tx,
controller_upcall_client,
control_plane_client,
lsn_table.clone(),
cancel.clone(),
),
@@ -704,7 +704,7 @@ mod test {
use tokio::task::JoinHandle;
use crate::{
controller_upcall_client::RetryForeverError,
control_plane_client::RetryForeverError,
repository::Key,
tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
};

View File

@@ -25,8 +25,8 @@ use tracing::info;
use tracing::warn;
use crate::config::PageServerConf;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::controller_upcall_client::RetryForeverError;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::MaybeFatalIo;
@@ -61,7 +61,7 @@ where
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
// Client for calling into control plane API for validation of deletes
controller_upcall_client: Option<C>,
control_plane_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -94,7 +94,7 @@ where
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
controller_upcall_client: Option<C>,
control_plane_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
@@ -102,7 +102,7 @@ where
conf,
rx,
tx,
controller_upcall_client,
control_plane_client,
lsn_table,
pending_lists: Vec::new(),
validated_lists: Vec::new(),
@@ -145,8 +145,8 @@ where
return Ok(());
}
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
match controller_upcall_client
let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
match control_plane_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{

View File

@@ -56,7 +56,6 @@ use utils::http::endpoint::request_span;
use utils::http::request::must_parse_query_param;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::pgdatadir_mapping::LsnForTimestamp;
@@ -81,6 +80,7 @@ use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
@@ -589,10 +589,6 @@ async fn timeline_create_handler(
StatusCode::SERVICE_UNAVAILABLE,
HttpErrorBody::from_msg(e.to_string()),
),
Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
StatusCode::NOT_ACCEPTABLE,
HttpErrorBody::from_msg(e.to_string()),
),
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
StatusCode::SERVICE_UNAVAILABLE,
HttpErrorBody::from_msg("tenant shutting down".to_string()),
@@ -824,7 +820,7 @@ async fn get_lsn_by_timestamp_handler(
let lease = if with_lease {
timeline
.init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.inspect_err(|_| {
warn!("fail to grant a lease to {}", lsn);
})
@@ -1692,18 +1688,9 @@ async fn lsn_lease_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = async {
timeline
.init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| {
ApiError::InternalServerError(
e.context(format!("invalid lsn lease request at {lsn}")),
)
})
}
.instrument(info_span!("init_lsn_lease", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await?;
let result = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
json_response(StatusCode::OK, result)
}
@@ -1719,13 +1706,8 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let gc_result = state
.tenant_manager
.immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
.await?;
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
json_response(StatusCode::OK, gc_result)
}
@@ -2973,7 +2955,7 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
|r| api_handler(r, timeline_preserve_initdb_handler),
)
.put(
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/archival_config",
|r| api_handler(r, timeline_archival_config_handler),
)

View File

@@ -6,7 +6,7 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
pub mod controller_upcall_client;
pub mod control_plane_client;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;

View File

@@ -8,8 +8,6 @@ use metrics::{
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, QueryError};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use tracing::warn;
@@ -1179,10 +1177,10 @@ pub(crate) mod virtual_file_io_engine {
}
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
global_latency_histo: &'a Histogram,
global_metric: &'a Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<&'a Histogram>,
timeline_metric: Option<&'a Histogram>,
ctx: &'c RequestContext,
start: std::time::Instant,
@@ -1214,10 +1212,9 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.global_latency_histo
.observe(ex_throttled.as_secs_f64());
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
@@ -1243,32 +1240,10 @@ pub enum SmgrQueryType {
#[derive(Debug)]
pub(crate) struct SmgrQueryTimePerTimeline {
global_started: [IntCounter; SmgrQueryType::COUNT],
global_latency: [Histogram; SmgrQueryType::COUNT],
per_timeline_getpage_started: IntCounter,
per_timeline_getpage_latency: Histogram,
global_metrics: [Histogram; SmgrQueryType::COUNT],
per_timeline_getpage: Histogram,
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
"pageserver_smgr_query_started_global_count",
"Number of smgr queries started, aggregated by query type.",
&["smgr_query_type"],
)
.expect("failed to define a metric")
});
static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
"pageserver_smgr_query_started_count",
"Number of smgr queries started, aggregated by query type and tenant/timeline.",
&["smgr_query_type", "tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_smgr_query_seconds",
@@ -1344,20 +1319,14 @@ impl SmgrQueryTimePerTimeline {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_slug = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id.to_string();
let global_started = std::array::from_fn(|i| {
let op = SmgrQueryType::from_repr(i).unwrap();
SMGR_QUERY_STARTED_GLOBAL
.get_metric_with_label_values(&[op.into()])
.unwrap()
});
let global_latency = std::array::from_fn(|i| {
let global_metrics = std::array::from_fn(|i| {
let op = SmgrQueryType::from_repr(i).unwrap();
SMGR_QUERY_TIME_GLOBAL
.get_metric_with_label_values(&[op.into()])
.unwrap()
});
let per_timeline_getpage_started = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE
let per_timeline_getpage = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
&tenant_id,
@@ -1365,32 +1334,18 @@ impl SmgrQueryTimePerTimeline {
&timeline_id,
])
.unwrap();
let per_timeline_getpage_latency = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
&tenant_id,
&shard_slug,
&timeline_id,
])
.unwrap();
Self {
global_started,
global_latency,
per_timeline_getpage_latency,
per_timeline_getpage_started,
global_metrics,
per_timeline_getpage,
}
}
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
self.global_started[op as usize].inc();
// We subtract time spent throttled from the observed latency.
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
@@ -1409,16 +1364,15 @@ impl SmgrQueryTimePerTimeline {
}
}
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
self.per_timeline_getpage_started.inc();
Some(&self.per_timeline_getpage_latency)
let timeline_metric = if matches!(op, SmgrQueryType::GetPageAtLsn) {
Some(&self.per_timeline_getpage)
} else {
None
};
Some(GlobalAndPerTimelineHistogramTimer {
global_latency_histo: &self.global_latency[op as usize],
per_timeline_latency_histo,
global_metric,
timeline_metric,
ctx,
start,
op,
@@ -1469,12 +1423,9 @@ mod smgr_query_time_tests {
let get_counts = || {
let global: u64 = ops
.iter()
.map(|op| metrics.global_latency[*op as usize].get_sample_count())
.map(|op| metrics.global_metrics[*op as usize].get_sample_count())
.sum();
(
global,
metrics.per_timeline_getpage_latency.get_sample_count(),
)
(global, metrics.per_timeline_getpage.get_sample_count())
};
let (pre_global, pre_per_tenant_timeline) = get_counts();
@@ -1510,7 +1461,6 @@ static COMPUTE_STARTUP_BUCKETS: Lazy<[f64; 28]> = Lazy::new(|| {
pub(crate) struct BasebackupQueryTime {
ok: Histogram,
error: Histogram,
client_error: Histogram,
}
pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
@@ -1524,7 +1474,6 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
BasebackupQueryTime {
ok: vec.get_metric_with_label_values(&["ok"]).unwrap(),
error: vec.get_metric_with_label_values(&["error"]).unwrap(),
client_error: vec.get_metric_with_label_values(&["client_error"]).unwrap(),
}
});
@@ -1538,7 +1487,7 @@ impl BasebackupQueryTime {
pub(crate) fn start_recording<'c: 'a, 'a>(
&'a self,
ctx: &'c RequestContext,
) -> BasebackupQueryTimeOngoingRecording<'a, 'a> {
) -> BasebackupQueryTimeOngoingRecording<'_, '_> {
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
@@ -1561,7 +1510,7 @@ impl BasebackupQueryTime {
}
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
pub(crate) fn observe<T>(self, res: &Result<T, QueryError>) {
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
@@ -1580,15 +1529,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
elapsed
}
};
// If you want to change categorize of a specific error, also change it in `log_query_error`.
let metric = match res {
Ok(_) => &self.parent.ok,
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))
if is_expected_io_error(io_error) =>
{
&self.parent.client_error
}
Err(_) => &self.parent.error,
let metric = if res.is_ok() {
&self.parent.ok
} else {
&self.parent.error
};
metric.observe(ex_throttled.as_secs_f64());
}
@@ -1833,7 +1777,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
.expect("failed to define a metric"),
upload_heatmap_duration: register_histogram!(
"pageserver_secondary_upload_heatmap_duration",
"Time to build and upload a heatmap, including any waiting inside the remote storage client"
"Time to build and upload a heatmap, including any waiting inside the S3 client"
)
.expect("failed to define a metric"),
download_heatmap: register_int_counter!(
@@ -2632,12 +2576,6 @@ impl TimelineMetrics {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
}
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
shard_id,
timeline_id,
]);
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
@@ -2654,8 +2592,6 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
}
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
// we leave the BROKEN_TENANTS_SET entry if any
}
@@ -3119,173 +3055,41 @@ pub mod tokio_epoll_uring {
pub(crate) mod tenant_throttling {
use metrics::{register_int_counter_vec, IntCounter};
use once_cell::sync::Lazy;
use utils::shard::TenantShardId;
use crate::tenant::{self, throttle::Metric};
struct GlobalAndPerTenantIntCounter {
global: IntCounter,
per_tenant: IntCounter,
}
impl GlobalAndPerTenantIntCounter {
#[inline(always)]
pub(crate) fn inc(&self) {
self.inc_by(1)
}
#[inline(always)]
pub(crate) fn inc_by(&self, n: u64) {
self.global.inc_by(n);
self.per_tenant.inc_by(n);
}
}
pub(crate) struct TimelineGet {
count_accounted_start: GlobalAndPerTenantIntCounter,
count_accounted_finish: GlobalAndPerTenantIntCounter,
wait_time: GlobalAndPerTenantIntCounter,
count_throttled: GlobalAndPerTenantIntCounter,
wait_time: IntCounter,
count: IntCounter,
}
static COUNT_ACCOUNTED_START: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_start_global",
"Count of tenant throttling starts, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static COUNT_ACCOUNTED_START_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_start",
"Count of tenant throttling starts, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
static COUNT_ACCOUNTED_FINISH: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_finish_global",
"Count of tenant throttling finishes, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static COUNT_ACCOUNTED_FINISH_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_finish",
"Count of tenant throttling finishes, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
pub(crate) static TIMELINE_GET: Lazy<TimelineGet> = Lazy::new(|| {
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum_global",
"Sum of microseconds that spent waiting throttle by kind of throttle.",
"Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.",
&["kind"]
)
.unwrap()
});
static WAIT_USECS_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum",
"Sum of microseconds that spent waiting throttle by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
.unwrap()
});
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_global",
"Count of tenant throttlings, by kind of throttle.",
&["kind"]
)
.unwrap()
});
let kind = "timeline_get";
TimelineGet {
wait_time: WAIT_USECS.with_label_values(&[kind]),
count: WAIT_COUNT.with_label_values(&[kind]),
}
});
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_global",
"Count of tenant throttlings, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static WAIT_COUNT_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count",
"Count of tenant throttlings, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
const KIND: &str = "timeline_get";
impl TimelineGet {
pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self {
let per_tenant_label_values = &[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
];
TimelineGet {
count_accounted_start: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
count_accounted_finish: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
wait_time: {
GlobalAndPerTenantIntCounter {
global: WAIT_USECS.with_label_values(&[KIND]),
per_tenant: WAIT_USECS_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
count_throttled: {
GlobalAndPerTenantIntCounter {
global: WAIT_COUNT.with_label_values(&[KIND]),
per_tenant: WAIT_COUNT_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
}
}
}
pub(crate) fn preinitialize_global_metrics() {
Lazy::force(&COUNT_ACCOUNTED_START);
Lazy::force(&COUNT_ACCOUNTED_FINISH);
Lazy::force(&WAIT_USECS);
Lazy::force(&WAIT_COUNT);
}
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
for m in &[
&COUNT_ACCOUNTED_START_PER_TENANT,
&COUNT_ACCOUNTED_FINISH_PER_TENANT,
&WAIT_USECS_PER_TENANT,
&WAIT_COUNT_PER_TENANT,
] {
let _ = m.remove_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]);
}
}
impl Metric for TimelineGet {
#[inline(always)]
fn accounting_start(&self) {
self.count_accounted_start.inc();
}
#[inline(always)]
fn accounting_finish(&self) {
self.count_accounted_finish.inc();
}
impl Metric for &'static TimelineGet {
#[inline(always)]
fn observe_throttling(
&self,
@@ -3293,7 +3097,7 @@ pub(crate) mod tenant_throttling {
) {
let val = u64::try_from(wait_time.as_micros()).unwrap();
self.wait_time.inc_by(val);
self.count_throttled.inc();
self.count.inc();
}
}
}
@@ -3423,14 +3227,11 @@ pub fn preinitialize_metrics() {
}
// countervecs
[
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
&SMGR_QUERY_STARTED_GLOBAL,
]
.into_iter()
.for_each(|c| {
Lazy::force(c);
});
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
.into_iter()
.for_each(|c| {
Lazy::force(c);
});
// gauges
WALRECEIVER_ACTIVE_MANAGERS.get();
@@ -3452,8 +3253,7 @@ pub fn preinitialize_metrics() {
// Custom
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&tenant_throttling::TIMELINE_GET);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
tenant_throttling::preinitialize_global_metrics();
}

View File

@@ -273,20 +273,10 @@ async fn page_service_conn_main(
info!("Postgres client disconnected ({io_error})");
Ok(())
} else {
let tenant_id = conn_handler.timeline_handles.tenant_id();
Err(io_error).context(format!(
"Postgres connection error for tenant_id={:?} client at peer_addr={}",
tenant_id, peer_addr
))
Err(io_error).context("Postgres connection error")
}
}
other => {
let tenant_id = conn_handler.timeline_handles.tenant_id();
other.context(format!(
"Postgres query error for tenant_id={:?} client peer_addr={}",
tenant_id, peer_addr
))
}
other => other.context("Postgres query error"),
}
}
@@ -350,10 +340,6 @@ impl TimelineHandles {
}
})
}
fn tenant_id(&self) -> Option<TenantId> {
self.wrapper.tenant_id.get().copied()
}
}
pub(crate) struct TenantManagerWrapper {
@@ -833,7 +819,7 @@ impl PageServerHandler {
set_tracing_field_shard_id(&timeline);
let lease = timeline
.renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.inspect_err(|e| {
warn!("{e}");
})

View File

@@ -840,36 +840,6 @@ impl Timeline {
Ok(total_size * BLCKSZ as u64)
}
/// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
/// for gc-compaction.
///
/// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
/// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
/// be kept only for a specific range of LSN.
///
/// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
/// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
/// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
/// determine which keys to retain/drop for gc-compaction.
///
/// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
/// to be retained for each of the branch LSN.
///
/// The return value is (dense keyspace, sparse keyspace).
pub(crate) async fn collect_gc_compaction_keyspace(
&self,
) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
let metadata_key_begin = Key::metadata_key_range().start;
let aux_v1_key = AUX_FILES_KEY;
let dense_keyspace = KeySpace {
ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
};
Ok((
dense_keyspace,
SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
))
}
///
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from

View File

@@ -18,10 +18,10 @@ use camino::Utf8Path;
use camino::Utf8PathBuf;
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::TimelineState;
use pageserver_api::models::TopTenantShardItem;
@@ -34,7 +34,6 @@ use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
@@ -141,7 +140,6 @@ pub mod metadata;
pub mod remote_timeline_client;
pub mod storage_layer;
pub mod checks;
pub mod config;
pub mod mgr;
pub mod secondary;
@@ -183,54 +181,27 @@ pub struct TenantSharedResources {
pub(super) struct AttachedTenantConf {
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
/// The deadline before which we are blocked from GC so that
/// leases have a chance to be renewed.
lsn_lease_deadline: Option<tokio::time::Instant>,
}
impl AttachedTenantConf {
fn new(tenant_conf: TenantConfOpt, location: AttachedLocationConfig) -> Self {
// Sets a deadline before which we cannot proceed to GC due to lsn lease.
//
// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
// length, we guarantee that all the leases we granted before will have a chance to renew
// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
let lsn_lease_deadline = if location.attach_mode == AttachmentMode::Single {
Some(
tokio::time::Instant::now()
+ tenant_conf
.lsn_lease_length
.unwrap_or(LsnLease::DEFAULT_LENGTH),
)
} else {
// We don't use `lsn_lease_deadline` to delay GC in AttachedMulti and AttachedStale
// because we don't do GC in these modes.
None
};
Self {
tenant_conf,
location,
lsn_lease_deadline,
}
}
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => {
Ok(Self::new(location_conf.tenant_conf, *attach_conf))
}
LocationMode::Attached(attach_conf) => Ok(Self {
tenant_conf: location_conf.tenant_conf,
location: *attach_conf,
}),
LocationMode::Secondary(_) => {
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
}
}
}
fn is_gc_blocked_by_lsn_lease_deadline(&self) -> bool {
self.lsn_lease_deadline
.map(|d| tokio::time::Instant::now() < d)
.unwrap_or(false)
}
}
struct TimelinePreload {
timeline_id: TimelineId,
@@ -330,7 +301,7 @@ pub struct Tenant {
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach concurrency limiter.
///
@@ -591,8 +562,6 @@ pub enum CreateTimelineError {
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error("ancestor timeline is archived")]
AncestorArchived,
#[error("tenant shutting down")]
ShuttingDown,
#[error(transparent)]
@@ -1061,9 +1030,13 @@ impl Tenant {
}
Ok(TenantPreload {
timelines: self
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
.await?,
timelines: Self::load_timeline_metadata(
self,
remote_timeline_ids,
remote_storage,
cancel,
)
.await?,
})
}
@@ -1329,7 +1302,7 @@ impl Tenant {
.await
}
async fn load_timelines_metadata(
async fn load_timeline_metadata(
self: &Arc<Tenant>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
@@ -1337,10 +1310,33 @@ impl Tenant {
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
let mut part_downloads = JoinSet::new();
for timeline_id in timeline_ids {
let client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
let cancel_clone = cancel.clone();
part_downloads.spawn(
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
.instrument(info_span!("download_index_part", %timeline_id)),
async move {
debug!("starting index part download");
let index_part = client.download_index_file(&cancel_clone).await;
debug!("finished index part download");
Result::<_, anyhow::Error>::Ok(TimelinePreload {
client,
timeline_id,
index_part,
})
}
.map(move |res| {
res.with_context(|| format!("download index part for timeline {timeline_id}"))
})
.instrument(info_span!("download_index_part", %timeline_id)),
);
}
@@ -1351,7 +1347,8 @@ impl Tenant {
next = part_downloads.join_next() => {
match next {
Some(result) => {
let preload = result.context("join preload task")?;
let preload_result = result.context("join preload task")?;
let preload = preload_result?;
timeline_preloads.insert(preload.timeline_id, preload);
},
None => {
@@ -1368,36 +1365,6 @@ impl Tenant {
Ok(timeline_preloads)
}
fn load_timeline_metadata(
self: &Arc<Tenant>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
) -> impl Future<Output = TimelinePreload> {
let client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("starting index part download");
let index_part = client.download_index_file(&cancel).await;
debug!("finished index part download");
TimelinePreload {
client,
timeline_id,
index_part,
}
}
}
pub(crate) async fn apply_timeline_archival_config(
&self,
timeline_id: TimelineId,
@@ -1606,9 +1573,6 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
@@ -1623,18 +1587,6 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -1728,11 +1680,6 @@ impl Tenant {
return Err(CreateTimelineError::AncestorNotActive);
}
if ancestor_timeline.is_archived() == Some(true) {
info!("tried to branch archived timeline");
return Err(CreateTimelineError::AncestorArchived);
}
if let Some(lsn) = ancestor_start_lsn.as_mut() {
*lsn = lsn.align();
@@ -1850,11 +1797,6 @@ impl Tenant {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
}
if conf.is_gc_blocked_by_lsn_lease_deadline() {
info!("Skipping GC because lsn lease deadline is not reached");
return Ok(GcResult::default());
}
}
let _guard = match self.gc_block.start().await {
@@ -2008,6 +1950,9 @@ impl Tenant {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
*current_state = TenantState::Activating(ActivatingFrom::Loading);
}
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
}
@@ -2188,7 +2133,7 @@ impl Tenant {
async fn set_stopping(
&self,
progress: completion::Barrier,
_allow_transition_from_loading: bool,
allow_transition_from_loading: bool,
allow_transition_from_attaching: bool,
) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
@@ -2203,6 +2148,7 @@ impl Tenant {
);
false
}
TenantState::Loading => allow_transition_from_loading,
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
})
.await
@@ -2221,6 +2167,13 @@ impl Tenant {
*current_state = TenantState::Stopping { progress };
true
}
TenantState::Loading => {
if !allow_transition_from_loading {
unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
@@ -2276,7 +2229,7 @@ impl Tenant {
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Attaching => {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
@@ -2296,7 +2249,7 @@ impl Tenant {
let reason = reason.to_string();
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Attaching => {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
@@ -2340,7 +2293,7 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Attaching | TenantState::Activating(_) => {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
// in these states, there's a chance that we can reach ::Active
self.activate_now();
match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
@@ -2663,8 +2616,6 @@ impl Tenant {
Arc::new(AttachedTenantConf {
tenant_conf: new_tenant_conf.clone(),
location: inner.location,
// Attached location is not changed, no need to update lsn lease deadline.
lsn_lease_deadline: inner.lsn_lease_deadline,
})
});
@@ -2864,7 +2815,7 @@ impl Tenant {
gate: Gate::default(),
timeline_get_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
crate::metrics::tenant_throttling::TimelineGet::new(&tenant_shard_id),
&crate::metrics::tenant_throttling::TIMELINE_GET,
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
@@ -3246,9 +3197,6 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx)
.await?;
@@ -3269,18 +3217,6 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -3658,7 +3594,7 @@ impl Tenant {
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<UninitializedTimeline<'a>> {
) -> anyhow::Result<UninitializedTimeline> {
let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
@@ -3922,9 +3858,9 @@ async fn run_initdb(
let _permit = INIT_DB_SEMAPHORE.acquire().await;
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
.args(["--pgdata", initdb_target_dir.as_ref()])
.args(["--username", &conf.superuser])
.args(["--encoding", "utf8"])
.args(["-D", initdb_target_dir.as_ref()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
.arg("--no-instructions")
.arg("--no-sync")
.env_clear()
@@ -4175,7 +4111,7 @@ pub(crate) mod harness {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
TenantState::Loading,
self.conf,
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf.clone()),
@@ -4228,18 +4164,9 @@ pub(crate) mod harness {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let mut page = match (base_img, records.first()) {
(Some((_lsn, img)), _) => {
let mut page = BytesMut::new();
page.extend_from_slice(&img);
page
}
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
_ => {
panic!("Neon WAL redo requires base image or will init record");
}
};
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
for (record_lsn, record) in records {
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
}
@@ -4496,17 +4423,13 @@ mod tests {
tline.freeze_and_flush().await.map_err(|e| e.into())
}
#[tokio::test(start_paused = true)]
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -7283,17 +7206,9 @@ mod tests {
Ok(())
}
#[tokio::test(start_paused = true)]
#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")
.await
.unwrap()
.load()
.await;
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
@@ -7321,33 +7236,24 @@ mod tests {
let leased_lsns = [0x30, 0x50, 0x70];
let mut leases = Vec::new();
leased_lsns.iter().for_each(|n| {
leases.push(
timeline
.init_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)
.expect("lease request should succeed"),
);
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?);
Ok(())
});
let updated_lease_0 = timeline
.renew_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)
.expect("lease renewal should succeed");
assert_eq!(
updated_lease_0.valid_until, leases[0].valid_until,
" Renewing with shorter lease should not change the lease."
);
// Renewing with shorter lease should not change the lease.
let updated_lease_0 =
timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?;
assert_eq!(updated_lease_0.valid_until, leases[0].valid_until);
let updated_lease_1 = timeline
.renew_lsn_lease(
Lsn(leased_lsns[1]),
timeline.get_lsn_lease_length() * 2,
&ctx,
)
.expect("lease renewal should succeed");
assert!(
updated_lease_1.valid_until > leases[1].valid_until,
"Renewing with a long lease should renew lease with later expiration time."
);
// Renewing with a long lease should renew lease with later expiration time.
let updated_lease_1 = timeline.make_lsn_lease(
Lsn(leased_lsns[1]),
timeline.get_lsn_lease_length() * 2,
&ctx,
)?;
assert!(updated_lease_1.valid_until > leases[1].valid_until);
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
@@ -7364,8 +7270,7 @@ mod tests {
&CancellationToken::new(),
&ctx,
)
.await
.unwrap();
.await?;
// Keeping everything <= Lsn(0x80) b/c leases:
// 0/10: initdb layer
@@ -7379,16 +7284,13 @@ mod tests {
// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
timeline
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
.expect_err("lease request on GC-ed LSN should fail");
let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx);
assert!(res.is_err());
// Should still be able to renew a currently valid lease
// Assumption: original lease to is still valid for 0/50.
// (use `Timeline::init_lsn_lease` for testing so it always does validation)
timeline
.init_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)
.expect("lease renewal with validation should succeed");
let _ =
timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?;
Ok(())
}
@@ -8590,7 +8492,6 @@ mod tests {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
let will_init_keys = [2, 6];
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
@@ -8640,25 +8541,18 @@ mod tests {
}
};
let will_init = will_init_keys.contains(&i);
if will_init {
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
expected_key_values.insert(key, "".to_string());
} else {
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);

View File

@@ -1,56 +0,0 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
///
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
/// | 1 | | 2 | | 3 |
/// | | | | | |
/// ```
///
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
/// the same LSN range.
///
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
///
/// ```plain
/// | | | 2 | | |
/// | 1 | |-------| | 3 |
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None
}

View File

@@ -6,7 +6,6 @@ use super::remote_timeline_client::index::GcBlockingReason;
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
/// GcBlock provides persistent (per-timeline) gc blocking.
#[derive(Default)]
pub(crate) struct GcBlock {
/// The timelines which have current reasons to block gc.
@@ -14,12 +13,6 @@ pub(crate) struct GcBlock {
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
reasons: std::sync::Mutex<Storage>,
/// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
///
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
}
@@ -49,9 +42,6 @@ impl GcBlock {
}
}
/// Describe the current gc blocking reasons.
///
/// TODO: make this json serializable.
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();

View File

@@ -30,8 +30,8 @@ use utils::{backoff, completion, crashsafe};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::controller_upcall_client::{
ControlPlaneGenerationsApi, ControllerUpcallClient, RetryForeverError,
use crate::control_plane_client::{
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
@@ -122,7 +122,7 @@ pub(crate) enum ShardSelector {
Known(ShardIndex),
}
/// A convenience for use with the re_attach ControllerUpcallClient function: rather
/// A convenience for use with the re_attach ControlPlaneClient function: rather
/// than the serializable struct, we build this enum that encapsulates
/// the invariant that attached tenants always have generations.
///
@@ -219,11 +219,7 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
+ TEMP_FILE_SUFFIX;
let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
fs::rename(path.as_ref(), &tmp_path).await?;
fs::File::open(parent)
.await?
.sync_all()
.await
.maybe_fatal_err("safe_rename_tenant_dir")?;
fs::File::open(parent).await?.sync_all().await?;
Ok(tmp_path)
}
@@ -345,7 +341,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = ControllerUpcallClient::new(conf, cancel) {
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
@@ -2197,82 +2193,6 @@ impl TenantManager {
Ok((wanted_bytes, shard_count as u32))
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = self.tenants.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
}
#[derive(Debug, thiserror::Error)]
@@ -2417,7 +2337,7 @@ enum TenantSlotDropError {
/// Errors that can happen any time we are walking the tenant map to try and acquire
/// the TenantSlot for a particular tenant.
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapError {
pub enum TenantMapError {
// Tried to read while initializing
#[error("tenant map is still initializing")]
StillInitializing,
@@ -2447,7 +2367,7 @@ pub(crate) enum TenantMapError {
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub(crate) struct SlotGuard {
pub struct SlotGuard {
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -2840,6 +2760,81 @@ use {
utils::http::error::ApiError,
};
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = TENANTS.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

View File

@@ -178,7 +178,6 @@ async fn download_object<'a>(
destination_file
.flush()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
@@ -186,7 +185,6 @@ async fn download_object<'a>(
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
@@ -234,7 +232,6 @@ async fn download_object<'a>(
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;

View File

@@ -1,13 +1,13 @@
//! Common traits and structs for layers
pub mod delta_layer;
pub mod filter_iterator;
pub mod image_layer;
pub mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;
mod layer_name;
pub mod merge_iterator;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
@@ -276,20 +276,10 @@ pub(crate) enum LayerId {
InMemoryLayerId(InMemoryLayerFileId),
}
/// Uniquely identify a layer visit by the layer
/// and LSN floor (or start LSN) of the reads.
/// The layer itself is not enough since we may
/// have different LSN lower bounds for delta layer reads.
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct LayerToVisitId {
layer_id: LayerId,
lsn_floor: Lsn,
}
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ReadableLayer {
PersistentLayer(Layer),
InMemoryLayer(Arc<InMemoryLayer>),
@@ -297,11 +287,13 @@ pub(crate) enum ReadableLayer {
/// A partial description of a read to be done.
#[derive(Debug, Clone)]
struct LayerVisit {
struct ReadDesc {
/// An id used to resolve the readable layer within the fringe
layer_to_visit_id: LayerToVisitId,
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
/// This read's index in [`LayerKeyspace::reads`];
read_id: LayerKeyspaceReadId,
}
/// Data structure which maintains a fringe of layers for the
@@ -313,46 +305,52 @@ struct LayerVisit {
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
planned_visits_by_lsn: BinaryHeap<LayerVisit>,
visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
layers: HashMap<LayerId, LayerKeyspace>,
}
#[derive(Debug)]
struct LayerVisitReads {
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
next_read_id: LayerKeyspaceReadId,
reads: HashMap<LayerKeyspaceReadId, (Range<Lsn>, KeySpace)>,
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct LayerKeyspaceReadId(usize);
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
planned_visits_by_lsn: BinaryHeap::new(),
visit_reads: HashMap::new(),
planned_reads_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
let read_desc = match self.planned_visits_by_lsn.pop() {
let read_desc = match self.planned_reads_by_lsn.pop() {
Some(desc) => desc,
None => return None,
};
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
let mut entry = match self.layers.entry(read_desc.layer_id) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => unreachable!("fringe internals are always consistent"),
};
match removed {
Some((
_,
LayerVisitReads {
layer,
mut target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
let (lsn_range, keyspace) = entry
.get_mut()
.reads
.remove(&read_desc.read_id)
.expect("fringe internals are always consistent");
let layer = entry.get().layer.clone();
if entry.get().reads.is_empty() {
entry.remove();
}
Some((layer, keyspace, lsn_range))
}
pub(crate) fn update(
@@ -361,26 +359,35 @@ impl LayerFringe {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
) {
let layer_to_visit_id = LayerToVisitId {
layer_id: layer.id(),
lsn_floor: lsn_range.start,
};
let entry = self.visit_reads.entry(layer_to_visit_id.clone());
let layer_id = layer.id();
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
let read_id = {
let r = &mut entry.get_mut().next_read_id;
let read_id = *r;
*r = LayerKeyspaceReadId(r.0 + 1);
read_id
};
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let replaced = entry.get_mut().reads.insert(read_id, (lsn_range, keyspace));
assert!(replaced.is_none());
}
Entry::Vacant(entry) => {
self.planned_visits_by_lsn.push(LayerVisit {
lsn_range,
layer_to_visit_id: layer_to_visit_id.clone(),
let read_id = LayerKeyspaceReadId(0);
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerVisitReads {
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
next_read_id: LayerKeyspaceReadId(1),
reads: [(read_id, (lsn_range, keyspace))].into(),
});
}
}
@@ -393,7 +400,7 @@ impl Default for LayerFringe {
}
}
impl Ord for LayerVisit {
impl Ord for ReadDesc {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
if ord == std::cmp::Ordering::Equal {
@@ -404,19 +411,19 @@ impl Ord for LayerVisit {
}
}
impl PartialOrd for LayerVisit {
impl PartialOrd for ReadDesc {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for LayerVisit {
impl PartialEq for ReadDesc {
fn eq(&self, other: &Self) -> bool {
self.lsn_range == other.lsn_range
}
}
impl Eq for LayerVisit {}
impl Eq for ReadDesc {}
impl ReadableLayer {
pub(crate) fn id(&self) -> LayerId {

View File

@@ -39,12 +39,12 @@ use crate::tenant::disk_btree::{
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
@@ -589,9 +589,7 @@ impl DeltaLayerWriterInner {
);
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("delta_layer sync_all")?;
file.sync_all().await?;
trace!("created delta layer {}", self.path);
@@ -1023,30 +1021,13 @@ impl DeltaLayerInner {
continue;
}
};
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
let value = Value::des(&blob_read);
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
@@ -1135,7 +1116,7 @@ impl DeltaLayerInner {
ctx: &RequestContext,
) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{
BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
use futures::stream::TryStreamExt;
@@ -1185,8 +1166,8 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let mut read_builder: Option<VectoredReadBuilder> = None;
let read_mode = VectoredReadCoalesceMode::get();
let max_read_size = self
.max_vectored_read_bytes
@@ -1230,12 +1211,12 @@ impl DeltaLayerInner {
{
None
} else {
read_builder.replace(ChunkedVectoredReadBuilder::new(
read_builder.replace(VectoredReadBuilder::new(
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
align,
read_mode,
))
}
} else {
@@ -1262,21 +1243,21 @@ impl DeltaLayerInner {
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf, ctx).await?;
let view = BufView::new_slice(&res.buf);
for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = blob.read(&view).await?;
let data = &res.buf[blob.start..blob.end];
#[cfg(debug_assertions)]
Value::des(&data)
Value::des(data)
.with_context(|| {
format!(
"blob failed to deserialize for {}: {:?}",
blob,
utils::Hex(&data)
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
)
})
.unwrap();
@@ -1284,15 +1265,15 @@ impl DeltaLayerInner {
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(&data)
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
per_blob_copy.clear();
per_blob_copy.extend_from_slice(&data);
per_blob_copy.extend_from_slice(data);
let (tmp, res) = writer
.put_value_bytes(
@@ -1557,11 +1538,8 @@ impl<'a> DeltaLayerIterator<'a> {
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
let value = Value::des(&frozen_buf[meta.start..meta.end])?;
next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
}
self.key_values_batch = next_batch;
@@ -1938,13 +1916,9 @@ pub(crate) mod test {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let value = meta.read(&view).await?;
assert_eq!(
&value[..],
&entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
);
let value = &blobs_buf.buf[meta.start..meta.end];
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
}
buf = Some(blobs_buf.buf);

View File

@@ -1,205 +0,0 @@
use std::ops::Range;
use anyhow::bail;
use pageserver_api::{
key::Key,
keyspace::{KeySpace, SparseKeySpace},
};
use utils::lsn::Lsn;
use crate::repository::Value;
use super::merge_iterator::MergeIterator;
/// A filter iterator over merge iterators (and can be easily extended to other types of iterators).
///
/// The iterator will skip any keys not included in the keyspace filter. In other words, the keyspace filter contains the keys
/// to be retained.
pub struct FilterIterator<'a> {
inner: MergeIterator<'a>,
retain_key_filters: Vec<Range<Key>>,
current_filter_idx: usize,
}
impl<'a> FilterIterator<'a> {
pub fn create(
inner: MergeIterator<'a>,
dense_keyspace: KeySpace,
sparse_keyspace: SparseKeySpace,
) -> anyhow::Result<Self> {
let mut retain_key_filters = Vec::new();
retain_key_filters.extend(dense_keyspace.ranges);
retain_key_filters.extend(sparse_keyspace.0.ranges);
retain_key_filters.sort_by(|a, b| a.start.cmp(&b.start));
// Verify key filters are non-overlapping and sorted
for window in retain_key_filters.windows(2) {
if window[0].end > window[1].start {
bail!(
"Key filters are overlapping: {:?} and {:?}",
window[0],
window[1]
);
}
}
Ok(Self {
inner,
retain_key_filters,
current_filter_idx: 0,
})
}
pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
while let Some(item) = self.inner.next().await? {
while self.current_filter_idx < self.retain_key_filters.len()
&& item.0 >= self.retain_key_filters[self.current_filter_idx].end
{
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
self.current_filter_idx += 1;
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
}
if self.current_filter_idx >= self.retain_key_filters.len() {
// We already exhausted all filters, so we should return now
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter (nothing)
return Ok(None);
}
if self.retain_key_filters[self.current_filter_idx].contains(&item.0) {
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
return Ok(Some(item));
}
// If the key is not contained in the key retaining filters, continue to the next item.
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use itertools::Itertools;
use pageserver_api::key::Key;
use utils::lsn::Lsn;
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::delta_layer::test::produce_delta_layer,
},
DEFAULT_PG_VERSION,
};
async fn assert_filter_iter_equal(
filter_iter: &mut FilterIterator<'_>,
expect: &[(Key, Lsn, Value)],
) {
let mut expect_iter = expect.iter();
loop {
let o1 = filter_iter.next().await.unwrap();
let o2 = expect_iter.next();
assert_eq!(o1.is_some(), o2.is_some());
if o1.is_none() && o2.is_none() {
break;
}
let (k1, l1, v1) = o1.unwrap();
let (k2, l2, v2) = o2.unwrap();
assert_eq!(&k1, k2);
assert_eq!(l1, *l2);
assert_eq!(&v1, v2);
}
}
#[tokio::test]
async fn filter_keyspace_iterator() {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
const N: usize = 100;
let test_deltas1 = (0..N)
.map(|idx| {
(
get_key(idx as u32),
Lsn(0x20 * ((idx as u64) % 10 + 1)),
Value::Image(Bytes::from(format!("img{idx:05}"))),
)
})
.collect_vec();
let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
.await
.unwrap();
let merge_iter = MergeIterator::create(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
);
let mut filter_iter = FilterIterator::create(
merge_iter,
KeySpace {
ranges: vec![
get_key(5)..get_key(10),
get_key(20)..get_key(30),
get_key(90)..get_key(110),
get_key(1000)..get_key(2000),
],
},
SparseKeySpace(KeySpace::default()),
)
.unwrap();
let mut result = Vec::new();
result.extend(test_deltas1[5..10].iter().cloned());
result.extend(test_deltas1[20..30].iter().cloned());
result.extend(test_deltas1[90..100].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
let merge_iter = MergeIterator::create(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
);
let mut filter_iter = FilterIterator::create(
merge_iter,
KeySpace {
ranges: vec![
get_key(0)..get_key(10),
get_key(20)..get_key(30),
get_key(90)..get_key(95),
],
},
SparseKeySpace(KeySpace::default()),
)
.unwrap();
let mut result = Vec::new();
result.extend(test_deltas1[0..10].iter().cloned());
result.extend(test_deltas1[20..30].iter().cloned());
result.extend(test_deltas1[90..95].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
}
}

View File

@@ -36,12 +36,11 @@ use crate::tenant::disk_btree::{
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
@@ -59,6 +58,7 @@ use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -70,7 +70,9 @@ use utils::{
};
use super::layer_name::ImageLayerName;
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{
AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -548,15 +550,15 @@ impl ImageLayerInner {
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
let img_buf = frozen_buf.slice(meta.start..meta.end);
key_count += 1;
writer
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
.put_image(meta.meta.key, img_buf, ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
@@ -603,28 +605,13 @@ impl ImageLayerInner {
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);
continue;
}
};
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf.into_bytes()),
Value::Image(img_buf),
);
}
}
@@ -813,9 +800,10 @@ impl ImageLayerWriterInner {
///
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -889,13 +877,14 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
file.sync_all().await?;
trace!("created image layer {}", self.path);
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
Ok((desc, self.path))
info!("created image layer {}", layer.local_path());
Ok(layer)
}
}
@@ -974,18 +963,24 @@ impl ImageLayerWriter {
///
pub(crate) async fn finish(
mut self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, None).await
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx, None).await
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
timeline: &Arc<Timeline>,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
) -> anyhow::Result<super::ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(timeline, ctx, Some(end_key))
.await
}
}
@@ -1043,15 +1038,10 @@ impl<'a> ImageLayerIterator<'a> {
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let frozen_buf: Bytes = blobs_buf.buf.freeze();
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((
meta.meta.key,
self.image_layer.lsn,
Value::Image(img_buf.into_bytes()),
));
let img_buf = frozen_buf.slice(meta.start..meta.end);
next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
}
self.key_values_batch = next_batch;
Ok(())
@@ -1094,7 +1084,7 @@ mod test {
tenant::{
config::TenantConf,
harness::{TenantHarness, TIMELINE_ID},
storage_layer::{Layer, ResidentLayer},
storage_layer::ResidentLayer,
vectored_blob_io::StreamingVectoredReadPlanner,
Tenant, Timeline,
},
@@ -1165,8 +1155,7 @@ mod test {
key = key.next();
}
let (desc, path) = writer.finish(&ctx).await.unwrap();
Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
writer.finish(&timeline, &ctx).await.unwrap()
};
let original_size = resident.metadata().file_size;
@@ -1228,9 +1217,7 @@ mod test {
.await
.unwrap();
let replacement = if wrote_keys > 0 {
let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
Some(resident)
Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
} else {
None
};
@@ -1303,8 +1290,7 @@ mod test {
for (key, img) in images {
writer.put_image(key, img, ctx).await?;
}
let (desc, path) = writer.finish(ctx).await?;
let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
let img_layer = writer.finish(tline, ctx).await?;
Ok::<_, anyhow::Error>(img_layer)
}

View File

@@ -439,30 +439,11 @@ impl Layer {
fn record_access(&self, ctx: &RequestContext) {
if self.0.access_stats.record_access(ctx) {
// Visibility was modified to Visible: maybe log about this
match ctx.task_kind() {
TaskKind::CalculateSyntheticSize
| TaskKind::GarbageCollector
| TaskKind::MgmtRequest => {
// This situation is expected in code paths do binary searches of the LSN space to resolve
// an LSN to a timestamp, which happens during GC, during GC cutoff calculations in synthetic size,
// and on-demand for certain HTTP API requests.
}
_ => {
// In all other contexts, it is unusual to do I/O involving layers which are not visible at
// some branch tip, so we log the fact that we are accessing something that the visibility
// calculation thought should not be visible.
//
// This case is legal in brief time windows: for example an in-flight getpage request can hold on to a layer object
// which was covered by a concurrent compaction.
tracing::info!(
"Layer {} became visible as a result of access",
self.0.desc.key()
);
}
}
// Update the timeline's visible bytes count
// Visibility was modified to Visible
tracing::info!(
"Layer {} became visible as a result of access",
self.0.desc.key()
);
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge

Some files were not shown because too many files have changed in this diff Show More