mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
15 Commits
tenant-tas
...
try-parkin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a15415442 | ||
|
|
4a05413a4c | ||
|
|
dd61f3558f | ||
|
|
8a714f1ebf | ||
|
|
137291dc24 | ||
|
|
eb8926083e | ||
|
|
26bca6ddba | ||
|
|
55192384c3 | ||
|
|
392cd8b1fc | ||
|
|
3cc531d093 | ||
|
|
84b9fcbbd5 | ||
|
|
93e050afe3 | ||
|
|
6d7dc384a5 | ||
|
|
3c2b03cd87 | ||
|
|
7c49abe7d1 |
@@ -1,6 +1,7 @@
|
||||
[pageservers]
|
||||
#zenith-us-stage-ps-1 console_region_id=27
|
||||
zenith-us-stage-ps-2 console_region_id=27
|
||||
zenith-us-stage-ps-3 console_region_id=27
|
||||
|
||||
[safekeepers]
|
||||
zenith-us-stage-sk-4 console_region_id=27
|
||||
|
||||
@@ -286,7 +286,7 @@ jobs:
|
||||
# no_output_timeout, specified here.
|
||||
no_output_timeout: 10m
|
||||
environment:
|
||||
- ZENITH_BIN: /tmp/zenith/bin
|
||||
- NEON_BIN: /tmp/zenith/bin
|
||||
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
|
||||
- TEST_OUTPUT: /tmp/test_output
|
||||
# this variable will be embedded in perf test report
|
||||
@@ -688,50 +688,6 @@ jobs:
|
||||
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
|
||||
# Trigger a new remote CI job
|
||||
remote-ci-trigger:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
parameters:
|
||||
remote_repo:
|
||||
type: string
|
||||
environment:
|
||||
REMOTE_REPO: << parameters.remote_repo >>
|
||||
steps:
|
||||
- run:
|
||||
name: Set PR's status to pending
|
||||
command: |
|
||||
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
|
||||
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "$CI_ACCESS_TOKEN" \
|
||||
--data \
|
||||
"{
|
||||
\"state\": \"pending\",
|
||||
\"context\": \"neon-cloud-e2e\",
|
||||
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
|
||||
}"
|
||||
- run:
|
||||
name: Request a remote CI test
|
||||
command: |
|
||||
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
|
||||
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "$CI_ACCESS_TOKEN" \
|
||||
--data \
|
||||
"{
|
||||
\"ref\": \"main\",
|
||||
\"inputs\": {
|
||||
\"ci_job_name\": \"neon-cloud-e2e\",
|
||||
\"commit_hash\": \"$CIRCLE_SHA1\",
|
||||
\"remote_repo\": \"$LOCAL_REPO\"
|
||||
}
|
||||
}"
|
||||
|
||||
workflows:
|
||||
build_and_test:
|
||||
jobs:
|
||||
@@ -880,14 +836,3 @@ workflows:
|
||||
- release
|
||||
requires:
|
||||
- docker-image-release
|
||||
- remote-ci-trigger:
|
||||
# Context passes credentials for gh api
|
||||
context: CI_ACCESS_TOKEN
|
||||
remote_repo: "neondatabase/cloud"
|
||||
requires:
|
||||
# XXX: Successful build doesn't mean everything is OK, but
|
||||
# the job to be triggered takes so much time to complete (~22 min)
|
||||
# that it's better not to wait for the commented-out steps
|
||||
- build-neon-release
|
||||
# - pg_regress-tests-release
|
||||
# - other-tests-release
|
||||
|
||||
29
.github/actions/run-python-test-set/action.yml
vendored
29
.github/actions/run-python-test-set/action.yml
vendored
@@ -2,25 +2,29 @@ name: 'Run python test'
|
||||
description: 'Runs a Neon python test set, performing all the required preparations before'
|
||||
|
||||
inputs:
|
||||
# Select the type of Rust build. Must be "release" or "debug".
|
||||
build_type:
|
||||
description: 'Type of Rust (neon) and C (postgres) builds. Must be "release" or "debug".'
|
||||
required: true
|
||||
rust_toolchain:
|
||||
description: 'Rust toolchain version to fetch the caches'
|
||||
required: true
|
||||
# This parameter is required, to prevent the mistake of running all tests in one job.
|
||||
test_selection:
|
||||
description: 'A python test suite to run'
|
||||
required: true
|
||||
# Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr
|
||||
extra_params:
|
||||
description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr'
|
||||
required: false
|
||||
default: ''
|
||||
needs_postgres_source:
|
||||
description: 'Set to true if the test suite requires postgres source checked out'
|
||||
required: false
|
||||
default: 'false'
|
||||
run_in_parallel:
|
||||
description: 'Whether to run tests in parallel'
|
||||
required: false
|
||||
default: 'true'
|
||||
save_perf_report:
|
||||
description: 'Whether to upload the performance report'
|
||||
required: false
|
||||
default: 'false'
|
||||
|
||||
@@ -60,7 +64,7 @@ runs:
|
||||
|
||||
- name: Run pytest
|
||||
env:
|
||||
ZENITH_BIN: /tmp/neon/bin
|
||||
NEON_BIN: /tmp/neon/bin
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
# this variable will be embedded in perf test report
|
||||
@@ -117,3 +121,20 @@ runs:
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
fi
|
||||
fi
|
||||
|
||||
- name: Delete all data but logs
|
||||
shell: bash -ex {0}
|
||||
if: always()
|
||||
run: |
|
||||
du -sh /tmp/test_output/*
|
||||
find /tmp/test_output -type f ! -name "*.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" ! -name "*.metrics" -delete
|
||||
du -sh /tmp/test_output/*
|
||||
|
||||
- name: Upload python test logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
retention-days: 7
|
||||
if-no-files-found: error
|
||||
name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs
|
||||
path: /tmp/test_output/
|
||||
|
||||
17
.github/actions/save-coverage-data/action.yml
vendored
Normal file
17
.github/actions/save-coverage-data/action.yml
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
name: 'Merge and upload coverage data'
|
||||
description: 'Compresses and uploads the coverage data as an artifact'
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Merge coverage data
|
||||
shell: bash -ex {0}
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage/ merge
|
||||
|
||||
- name: Upload coverage data
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
retention-days: 7
|
||||
if-no-files-found: error
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/neon/coverage/
|
||||
217
.github/workflows/build_and_test.yml
vendored
217
.github/workflows/build_and_test.yml
vendored
@@ -1,13 +1,33 @@
|
||||
name: build_and_test
|
||||
on: [ push ]
|
||||
name: Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.CACHEPOT_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY }}
|
||||
CACHEPOT_BUCKET: zenith-rust-cachepot
|
||||
RUSTC_WRAPPER: cachepot
|
||||
|
||||
|
||||
jobs:
|
||||
build-postgres:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
@@ -52,6 +72,7 @@ jobs:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ build-postgres ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
@@ -97,17 +118,11 @@ jobs:
|
||||
CARGO_FLAGS="--release --features profiling"
|
||||
fi
|
||||
|
||||
export CACHEPOT_BUCKET=zenith-rust-cachepot
|
||||
export RUSTC_WRAPPER=cachepot
|
||||
export AWS_ACCESS_KEY_ID="${{ secrets.AWS_ACCESS_KEY_ID }}"
|
||||
export AWS_SECRET_ACCESS_KEY="${{ secrets.AWS_SECRET_ACCESS_KEY }}"
|
||||
export HOME=/home/runner
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- name: Run cargo test
|
||||
run: |
|
||||
export HOME=/home/runner
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
CARGO_FLAGS=
|
||||
@@ -115,12 +130,11 @@ jobs:
|
||||
cov_prefix=()
|
||||
CARGO_FLAGS=--release
|
||||
fi
|
||||
|
||||
|
||||
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
|
||||
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
export HOME=/home/runner
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
@@ -137,39 +151,34 @@ jobs:
|
||||
jq -r '.executable | select(. != null)'
|
||||
)
|
||||
|
||||
mkdir -p /tmp/neon/bin
|
||||
mkdir -p /tmp/neon/test_bin
|
||||
mkdir -p /tmp/neon/etc
|
||||
mkdir -p /tmp/neon/bin/
|
||||
mkdir -p /tmp/neon/test_bin/
|
||||
mkdir -p /tmp/neon/etc/
|
||||
mkdir -p /tmp/neon/coverage/
|
||||
|
||||
# Install target binaries
|
||||
for bin in $binaries; do
|
||||
SRC=target/$BUILD_TYPE/$bin
|
||||
DST=/tmp/neon/bin/$bin
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/neon/etc/binaries.list
|
||||
cp "$SRC" "$DST"
|
||||
done
|
||||
|
||||
# Install test executables (for code coverage)
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
for bin in $binaries; do
|
||||
echo "/tmp/neon/bin/$bin" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/neon/test_bin/$(basename $bin)
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/neon/etc/binaries.list
|
||||
cp "$SRC" "$DST"
|
||||
echo "$DST" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a tmp_install /tmp/neon/pg_install
|
||||
|
||||
- name: Merge coverage data
|
||||
run: |
|
||||
export HOME=/home/runner
|
||||
# This will speed up workspace uploads
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage merge
|
||||
fi
|
||||
|
||||
- name: Prepare neon artifact
|
||||
run: tar -C /tmp/neon/ -czf ./neon.tgz .
|
||||
|
||||
@@ -181,38 +190,17 @@ jobs:
|
||||
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
|
||||
path: ./neon.tgz
|
||||
|
||||
check-codestyle-python:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
strategy:
|
||||
matrix:
|
||||
rust_toolchain: [ 1.58 ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug'
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
|
||||
- name: Run yapf to ensure code format
|
||||
run: poetry run yapf --recursive --diff .
|
||||
|
||||
- name: Run mypy to check types
|
||||
run: poetry run mypy .
|
||||
|
||||
pg_regress-tests:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ build-neon ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
@@ -231,10 +219,15 @@ jobs:
|
||||
test_selection: batch_pg_regress
|
||||
needs_postgres_source: true
|
||||
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug'
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
other-tests:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ build-neon ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
@@ -252,10 +245,15 @@ jobs:
|
||||
rust_toolchain: ${{ matrix.rust_toolchain }}
|
||||
test_selection: batch_others
|
||||
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug'
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
benchmarks:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ build-neon ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ release ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
@@ -273,4 +271,117 @@ jobs:
|
||||
rust_toolchain: ${{ matrix.rust_toolchain }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
# save_perf_report: true
|
||||
save_perf_report: true
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
coverage-report:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ other-tests, pg_regress-tests ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug ]
|
||||
rust_toolchain: [ 1.58 ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Restore cargo deps cache
|
||||
id: cache_cargo
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry/
|
||||
~/.cargo/git/
|
||||
target/
|
||||
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
- name: Get Neon artifact for restoration
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
|
||||
path: ./neon-artifact/
|
||||
|
||||
- name: Extract Neon artifact
|
||||
run: |
|
||||
mkdir -p /tmp/neon/
|
||||
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
|
||||
rm -rf ./neon-artifact/
|
||||
|
||||
- name: Restore coverage data
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/neon/coverage/
|
||||
|
||||
- name: Build and upload coverage report
|
||||
run: |
|
||||
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
|
||||
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
|
||||
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
|
||||
|
||||
scripts/coverage \
|
||||
--dir=/tmp/neon/coverage report \
|
||||
--input-objects=/tmp/neon/coverage/binaries.list \
|
||||
--commit-url=$COMMIT_URL \
|
||||
--format=github
|
||||
|
||||
REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA
|
||||
|
||||
scripts/git-upload \
|
||||
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
|
||||
--message="Add code coverage for $COMMIT_URL" \
|
||||
copy /tmp/neon/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
|
||||
|
||||
# Add link to the coverage report to the commit
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
||||
--data \
|
||||
"{
|
||||
\"state\": \"success\",
|
||||
\"context\": \"neon-coverage\",
|
||||
\"description\": \"Coverage report is ready\",
|
||||
\"target_url\": \"$REPORT_URL\"
|
||||
}"
|
||||
|
||||
trigger-e2e-tests:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
needs: [ build-neon ]
|
||||
steps:
|
||||
- name: Set PR's status to pending and request a remote CI test
|
||||
run: |
|
||||
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
|
||||
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
|
||||
|
||||
REMOTE_REPO="${{ github.repository_owner }}/cloud"
|
||||
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
||||
--data \
|
||||
"{
|
||||
\"state\": \"pending\",
|
||||
\"context\": \"neon-cloud-e2e\",
|
||||
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
|
||||
}"
|
||||
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
||||
--data \
|
||||
"{
|
||||
\"ref\": \"main\",
|
||||
\"inputs\": {
|
||||
\"ci_job_name\": \"neon-cloud-e2e\",
|
||||
\"commit_hash\": \"$COMMIT_SHA\",
|
||||
\"remote_repo\": \"${{ github.repository }}\"
|
||||
}
|
||||
}"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Build and Test
|
||||
name: Check code style and build
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -6,9 +6,21 @@ on:
|
||||
- main
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
jobs:
|
||||
regression-check:
|
||||
check-codestyle-rust:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# If we want to duplicate this job for different
|
||||
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
|
||||
@@ -92,5 +104,30 @@ jobs:
|
||||
- name: Run cargo clippy
|
||||
run: ./run_clippy.sh
|
||||
|
||||
- name: Run cargo test
|
||||
run: cargo test --all --all-targets
|
||||
- name: Ensure all project builds
|
||||
run: cargo build --all --all-targets
|
||||
|
||||
check-codestyle-python:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: false
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
|
||||
- name: Run yapf to ensure code format
|
||||
run: poetry run yapf --recursive --diff .
|
||||
|
||||
- name: Run mypy to check types
|
||||
run: poetry run mypy .
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -461,6 +461,7 @@ dependencies = [
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"urlencoding",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -649,7 +650,7 @@ dependencies = [
|
||||
"crossterm_winapi",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"signal-hook",
|
||||
"signal-hook-mio",
|
||||
"winapi",
|
||||
@@ -1898,6 +1899,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -1938,9 +1940,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.0"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.2",
|
||||
@@ -2307,7 +2309,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"md5",
|
||||
"metrics",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"rcgen",
|
||||
@@ -3355,7 +3357,7 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"log",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
@@ -3684,6 +3686,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "urlencoding"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
|
||||
|
||||
[[package]]
|
||||
name = "utils"
|
||||
version = "0.1.0"
|
||||
|
||||
10
Dockerfile
10
Dockerfile
@@ -1,5 +1,5 @@
|
||||
# Build Postgres
|
||||
FROM zimg/rust:1.58 AS pg-build
|
||||
FROM neondatabase/rust:1.58 AS pg-build
|
||||
WORKDIR /pg
|
||||
|
||||
USER root
|
||||
@@ -14,7 +14,7 @@ RUN set -e \
|
||||
&& tar -C tmp_install -czf /postgres_install.tar.gz .
|
||||
|
||||
# Build zenith binaries
|
||||
FROM zimg/rust:1.58 AS build
|
||||
FROM neondatabase/rust:1.58 AS build
|
||||
ARG GIT_VERSION=local
|
||||
|
||||
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
|
||||
@@ -46,9 +46,9 @@ RUN set -e \
|
||||
&& useradd -d /data zenith \
|
||||
&& chown -R zenith:zenith /data
|
||||
|
||||
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/safekeeper /usr/local/bin
|
||||
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/proxy /usr/local/bin
|
||||
COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin
|
||||
COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin
|
||||
|
||||
COPY --from=pg-build /pg/tmp_install/ /usr/local/
|
||||
COPY --from=pg-build /postgres_install.tar.gz /data/
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# First transient image to build compute_tools binaries
|
||||
# NB: keep in sync with rust image version in .circle/config.yml
|
||||
FROM zimg/rust:1.58 AS rust-build
|
||||
FROM neondatabase/rust:1.58 AS rust-build
|
||||
|
||||
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
|
||||
ARG AWS_ACCESS_KEY_ID
|
||||
@@ -15,4 +15,4 @@ RUN set -e \
|
||||
# Final image that only has one binary
|
||||
FROM debian:buster-slim
|
||||
|
||||
COPY --from=rust-build /home/circleci/project/target/release/compute_ctl /usr/local/bin/compute_ctl
|
||||
COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
@@ -53,7 +53,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
|
||||
1. Install XCode and dependencies
|
||||
```
|
||||
xcode-select --install
|
||||
brew install protobuf etcd
|
||||
brew install protobuf etcd openssl
|
||||
```
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
|
||||
@@ -18,4 +18,5 @@ serde_json = "1"
|
||||
tar = "0.4"
|
||||
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
|
||||
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
urlencoding = "2.1.0"
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -289,6 +289,7 @@ impl ComputeNode {
|
||||
|
||||
handle_roles(&self.spec, &mut client)?;
|
||||
handle_databases(&self.spec, &mut client)?;
|
||||
handle_role_deletions(self, &mut client)?;
|
||||
handle_grants(&self.spec, &mut client)?;
|
||||
create_writablity_check_data(&mut client)?;
|
||||
|
||||
|
||||
@@ -2,9 +2,11 @@ use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use log::{info, log_enabled, warn, Level};
|
||||
use postgres::Client;
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::Deserialize;
|
||||
use urlencoding::encode;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::config;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -97,18 +99,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
|
||||
// Process delta operations first
|
||||
if let Some(ops) = &spec.delta_operations {
|
||||
info!("processing delta operations on roles");
|
||||
info!("processing role renames");
|
||||
for op in ops {
|
||||
match op.action.as_ref() {
|
||||
// We do not check either role exists or not,
|
||||
// Postgres will take care of it for us
|
||||
"delete_role" => {
|
||||
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
|
||||
|
||||
warn!("deleting role '{}'", &op.name);
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
// no-op now, roles will be deleted at the end of configuration
|
||||
}
|
||||
// Renaming role drops its password, since tole name is
|
||||
// Renaming role drops its password, since role name is
|
||||
// used as a salt there. It is important that this role
|
||||
// is recorded with a new `name` in the `roles` list.
|
||||
// Follow up roles update will set the new password.
|
||||
@@ -182,7 +179,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
let grant_query = format!(
|
||||
"grant pg_read_all_data, pg_write_all_data to {}",
|
||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||
name.quote()
|
||||
);
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
@@ -197,6 +194,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reassign all dependent objects and delete requested roles.
|
||||
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
let spec = &node.spec;
|
||||
|
||||
// First, reassign all dependent objects to db owners.
|
||||
if let Some(ops) = &spec.delta_operations {
|
||||
info!("reassigning dependent objects of to-be-deleted roles");
|
||||
for op in ops {
|
||||
if op.action == "delete_role" {
|
||||
reassign_owned_objects(node, &op.name)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Second, proceed with role deletions.
|
||||
let mut xact = client.transaction()?;
|
||||
if let Some(ops) = &spec.delta_operations {
|
||||
info!("processing role deletions");
|
||||
for op in ops {
|
||||
// We do not check either role exists or not,
|
||||
// Postgres will take care of it for us
|
||||
if op.action == "delete_role" {
|
||||
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
|
||||
|
||||
warn!("deleting role '{}'", &op.name);
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Reassign all owned objects in all databases to the owner of the database.
|
||||
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
|
||||
for db in &node.spec.cluster.databases {
|
||||
if db.owner != *role_name {
|
||||
let db_name_encoded = format!("/{}", encode(&db.name));
|
||||
let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1);
|
||||
let mut client = Client::connect(&db_connstr, NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.quote(),
|
||||
db.owner.quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name, &db.name, &db.owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It follows mostly the same logic as `handle_roles()` excepting that we
|
||||
/// does not use an explicit transactions block, since major database operations
|
||||
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
||||
@@ -294,13 +353,26 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
pub fn handle_grants(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
info!("cluster spec grants:");
|
||||
|
||||
// We now have a separate `web_access` role to connect to the database
|
||||
// via the web interface and proxy link auth. And also we grant a
|
||||
// read / write all data privilege to every role. So also grant
|
||||
// create to everyone.
|
||||
// XXX: later we should stop messing with Postgres ACL in such horrible
|
||||
// ways.
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| r.name.quote())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for db in &spec.cluster.databases {
|
||||
let dbname = &db.name;
|
||||
|
||||
let query: String = format!(
|
||||
"GRANT CREATE ON DATABASE {} TO {}",
|
||||
dbname.quote(),
|
||||
db.owner.quote()
|
||||
roles.join(", ")
|
||||
);
|
||||
info!("grant query {}", &query);
|
||||
|
||||
|
||||
@@ -6,17 +6,13 @@ pub mod subscription_key;
|
||||
/// All broker values, possible to use when dealing with etcd.
|
||||
pub mod subscription_value;
|
||||
|
||||
use std::{
|
||||
collections::{hash_map, HashMap},
|
||||
str::FromStr,
|
||||
};
|
||||
use std::str::FromStr;
|
||||
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use subscription_key::SubscriptionKey;
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
use tracing::*;
|
||||
use utils::zid::{NodeId, ZTenantTimelineId};
|
||||
|
||||
use crate::subscription_key::SubscriptionFullKey;
|
||||
|
||||
@@ -28,18 +24,17 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
|
||||
|
||||
/// A way to control the data retrieval from a certain subscription.
|
||||
pub struct BrokerSubscription<V> {
|
||||
value_updates: mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>>,
|
||||
/// An unbounded channel to fetch the relevant etcd updates from.
|
||||
pub value_updates: mpsc::UnboundedReceiver<BrokerUpdate<V>>,
|
||||
key: SubscriptionKey,
|
||||
watcher_handle: JoinHandle<Result<(), BrokerError>>,
|
||||
/// A subscription task handle, to allow waiting on it for the task to complete.
|
||||
/// Both the updates channel and the handle require `&mut`, so it's better to keep
|
||||
/// both `pub` to allow using both in the same structures without borrow checker complaining.
|
||||
pub watcher_handle: JoinHandle<Result<(), BrokerError>>,
|
||||
watcher: Watcher,
|
||||
}
|
||||
|
||||
impl<V> BrokerSubscription<V> {
|
||||
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
|
||||
pub async fn fetch_data(&mut self) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>> {
|
||||
self.value_updates.recv().await
|
||||
}
|
||||
|
||||
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
|
||||
pub async fn cancel(mut self) -> Result<(), BrokerError> {
|
||||
self.watcher.cancel().await.map_err(|e| {
|
||||
@@ -48,15 +43,41 @@ impl<V> BrokerSubscription<V> {
|
||||
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
|
||||
)
|
||||
})?;
|
||||
self.watcher_handle.await.map_err(|e| {
|
||||
BrokerError::InternalError(format!(
|
||||
"Failed to join the broker value updates task, kind: {:?}, error: {e}",
|
||||
self.key
|
||||
))
|
||||
})?
|
||||
match (&mut self.watcher_handle).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e.is_cancelled() {
|
||||
// don't error on the tasks that are cancelled already
|
||||
Ok(())
|
||||
} else {
|
||||
Err(BrokerError::InternalError(format!(
|
||||
"Panicked during broker subscription task, kind: {:?}, error: {e}",
|
||||
self.key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> Drop for BrokerSubscription<V> {
|
||||
fn drop(&mut self) {
|
||||
// we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped,
|
||||
// no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task.
|
||||
self.watcher_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// An update from the etcd broker.
|
||||
pub struct BrokerUpdate<V> {
|
||||
/// Etcd generation version, the bigger the more actual the data is.
|
||||
pub etcd_version: i64,
|
||||
/// Etcd key for the corresponding value, parsed from the broker KV.
|
||||
pub key: SubscriptionFullKey,
|
||||
/// Current etcd value, parsed from the broker KV.
|
||||
pub value: V,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BrokerError {
|
||||
#[error("Etcd client error: {0}. Context: {1}")]
|
||||
@@ -124,41 +145,21 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
let mut value_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, V>> = HashMap::new();
|
||||
// Keep track that the timeline data updates from etcd arrive in the right order.
|
||||
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
|
||||
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
|
||||
let mut value_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
|
||||
|
||||
|
||||
let events = resp.events();
|
||||
debug!("Processing {} events", events.len());
|
||||
|
||||
for event in events {
|
||||
if EventType::Put == event.event_type() {
|
||||
if let Some(new_etcd_kv) = event.kv() {
|
||||
let new_kv_version = new_etcd_kv.version();
|
||||
|
||||
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
|
||||
Ok(Some((key, value))) => match value_updates
|
||||
.entry(key.id)
|
||||
.or_default()
|
||||
.entry(key.node_id)
|
||||
{
|
||||
hash_map::Entry::Occupied(mut o) => {
|
||||
let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN);
|
||||
if old_etcd_kv_version < new_kv_version {
|
||||
o.insert(value);
|
||||
value_etcd_versions.insert(key.id,new_kv_version);
|
||||
} else {
|
||||
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(value);
|
||||
value_etcd_versions.insert(key.id,new_kv_version);
|
||||
}
|
||||
},
|
||||
Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate {
|
||||
etcd_version: new_etcd_kv.version(),
|
||||
key,
|
||||
value,
|
||||
}) {
|
||||
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
|
||||
break;
|
||||
},
|
||||
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
|
||||
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
|
||||
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
|
||||
@@ -166,13 +167,6 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !value_updates.is_empty() {
|
||||
if let Err(e) = value_updates_sender.send(value_updates) {
|
||||
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -63,6 +63,8 @@ workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
tempfile = "3.2"
|
||||
|
||||
@@ -263,8 +263,6 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
// start profiler (if enabled)
|
||||
let profiler_guard = profiling::init_profiler(conf);
|
||||
|
||||
pageserver::tenant_tasks::init_tenant_task_pool()?;
|
||||
|
||||
// initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
AuthType::Trust | AuthType::MD5 => None,
|
||||
|
||||
@@ -39,8 +39,7 @@ use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
use crate::repository::{
|
||||
GcResult, RepoIoError, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
|
||||
TimelineWriter,
|
||||
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
|
||||
};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant_mgr;
|
||||
@@ -159,11 +158,6 @@ pub struct LayeredRepository {
|
||||
// Global pageserver config parameters
|
||||
pub conf: &'static PageServerConf,
|
||||
|
||||
// Freezing the repo disallows any writes to its directory.
|
||||
//
|
||||
// Any writers must hold frozen.try_read() while writing.
|
||||
pub frozen: RwLock<bool>,
|
||||
|
||||
// Overridden tenant-specific config parameters.
|
||||
// We keep TenantConfOpt sturct here to preserve the information
|
||||
// about parameters that are not set.
|
||||
@@ -325,38 +319,19 @@ impl Repository for LayeredRepository {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult, RepoIoError> {
|
||||
) -> Result<GcResult> {
|
||||
let timeline_str = target_timelineid
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
|
||||
// Make sure repo is not frozen
|
||||
let guard = match self.frozen.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Err(RepoIoError::RepoFreezingError),
|
||||
};
|
||||
if *guard {
|
||||
return Err(RepoIoError::RepoFrozenError);
|
||||
}
|
||||
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
|
||||
})
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn compaction_iteration(&self) -> Result<(), RepoIoError> {
|
||||
// Make sure repo is not frozen
|
||||
let guard = match self.frozen.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Err(RepoIoError::RepoFreezingError),
|
||||
};
|
||||
if *guard {
|
||||
return Err(RepoIoError::RepoFrozenError);
|
||||
}
|
||||
|
||||
fn compaction_iteration(&self) -> Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
@@ -710,7 +685,6 @@ impl LayeredRepository {
|
||||
) -> LayeredRepository {
|
||||
LayeredRepository {
|
||||
tenant_id,
|
||||
frozen: RwLock::new(false),
|
||||
conf,
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
|
||||
@@ -13,7 +13,7 @@ pub mod repository;
|
||||
pub mod storage_sync;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_tasks;
|
||||
pub mod tenant_threads;
|
||||
pub mod thread_mgr;
|
||||
pub mod timelines;
|
||||
pub mod virtual_file;
|
||||
|
||||
@@ -36,13 +36,12 @@
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
|
||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
||||
},
|
||||
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -385,7 +384,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
|
||||
@@ -413,7 +412,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno: _ }
|
||||
@@ -454,7 +453,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().unwrap();
|
||||
let inner = slot.inner.read();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
@@ -543,7 +542,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
let inner = slot.inner.write();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
@@ -611,7 +610,7 @@ impl PageCache {
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -624,11 +623,11 @@ impl PageCache {
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -641,7 +640,7 @@ impl PageCache {
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -651,11 +650,11 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -670,7 +669,7 @@ impl PageCache {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
@@ -685,12 +684,12 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
@@ -708,7 +707,7 @@ impl PageCache {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
@@ -725,7 +724,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -735,7 +734,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -765,11 +764,8 @@ impl PageCache {
|
||||
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
panic!("buffer lock was poisoned: {:?}", err)
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
Some(inner) => inner,
|
||||
None => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
|
||||
@@ -733,17 +733,10 @@ impl PageServerHandler {
|
||||
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let all_rels = timeline.list_rels(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
|
||||
let mut total_blocks: i64 = 0;
|
||||
let total_blocks =
|
||||
timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
|
||||
|
||||
for rel in all_rels {
|
||||
if rel.forknum == 0 {
|
||||
let n_blocks = timeline.get_rel_size(rel, lsn).unwrap_or(0);
|
||||
total_blocks += n_blocks as i64;
|
||||
}
|
||||
}
|
||||
|
||||
let db_size = total_blocks * pg_constants::BLCKSZ as i64;
|
||||
let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64;
|
||||
|
||||
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
|
||||
db_size,
|
||||
|
||||
@@ -123,6 +123,19 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
self.tline.get(key, lsn)
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
|
||||
let mut total_blocks = 0;
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn)?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, lsn)?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
@@ -667,6 +680,10 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
}
|
||||
|
||||
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
|
||||
let req_lsn = self.tline.get_last_record_lsn();
|
||||
|
||||
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn)?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
let buf = self.get(DBDIR_KEY)?;
|
||||
let mut dir = DbDirectory::des(&buf)?;
|
||||
@@ -680,7 +697,8 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
);
|
||||
}
|
||||
|
||||
// FIXME: update pending_nblocks
|
||||
// Update logical database size.
|
||||
self.pending_nblocks -= total_blocks as isize;
|
||||
|
||||
// Delete all relations and metadata files for the spcnode/dnode
|
||||
self.delete(dbdir_key_range(spcnode, dbnode));
|
||||
|
||||
@@ -196,19 +196,6 @@ impl Display for TimelineSyncStatusUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RepoIoError {
|
||||
#[error("Cannot write to repo path while repo is frozen")]
|
||||
RepoFrozenError,
|
||||
|
||||
#[error("Cannot write to repo path while repo is being frozen")]
|
||||
RepoFreezingError,
|
||||
|
||||
/// Unstructured anyhow error
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
///
|
||||
/// A repository corresponds to one .neon directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
@@ -265,13 +252,13 @@ pub trait Repository: Send + Sync {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult, RepoIoError>;
|
||||
) -> Result<GcResult>;
|
||||
|
||||
/// Perform one compaction iteration.
|
||||
/// This function is periodically called by compactor thread.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
fn compaction_iteration(&self) -> Result<(), RepoIoError>;
|
||||
fn compaction_iteration(&self) -> Result<()>;
|
||||
|
||||
/// detaches timeline-related in-memory data.
|
||||
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
|
||||
@@ -230,6 +230,8 @@ pub fn shutdown_all_tenants() {
|
||||
drop(m);
|
||||
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
||||
|
||||
// Ok, no background threads running anymore. Flush any remaining data in
|
||||
// memory to disk.
|
||||
@@ -328,15 +330,44 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
}
|
||||
(TenantState::Idle, TenantState::Active) => {
|
||||
info!("activating tenant {tenant_id}");
|
||||
let compactor_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::Compactor,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"Compactor thread",
|
||||
false,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
);
|
||||
if compactor_spawn_result.is_err() {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
m.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||
.state = old_state;
|
||||
drop(m);
|
||||
}
|
||||
compactor_spawn_result?;
|
||||
|
||||
// Unfreeze the repo, allowing gc/compaction jobs to run
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
*repo.frozen.write().unwrap() = false;
|
||||
let gc_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollector,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"GC thread",
|
||||
false,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when tenant becomes idle and repo becomes frozen.
|
||||
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
|
||||
crate::tenant_tasks::start_gc_loop(tenant_id)?;
|
||||
if let Err(e) = &gc_spawn_result {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
m.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||
.state = old_state;
|
||||
drop(m);
|
||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
}
|
||||
(TenantState::Idle, TenantState::Stopping) => {
|
||||
info!("stopping idle tenant {tenant_id}");
|
||||
@@ -348,10 +379,8 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
Some(tenant_id),
|
||||
None,
|
||||
);
|
||||
|
||||
// Freeze the repo, waiting for existing gc/compaction to finish
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
*repo.frozen.write().unwrap() = true;
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,271 +0,0 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::ops::ControlFlow;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::repository::{RepoIoError, Repository};
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::{tenant_mgr, thread_mgr};
|
||||
use anyhow::{self, Context};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use metrics::{register_int_counter_vec, IntCounterVec};
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"tenant_task_events",
|
||||
"Number of task start/stop/fail events.",
|
||||
&["event"],
|
||||
)
|
||||
.expect("Failed to register tenant_task_events metric")
|
||||
});
|
||||
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
match repo.compaction_iteration() {
|
||||
Ok(_) => Ok(ControlFlow::Continue(compaction_period)),
|
||||
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
|
||||
Ok(ControlFlow::Break(()))
|
||||
}
|
||||
Err(RepoIoError::Other(e)) => Err(e),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
// Decide whether to sleep or break
|
||||
let sleep_duration = match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => period,
|
||||
Ok(Ok(ControlFlow::Break(()))) => break,
|
||||
Ok(Err(e)) => {
|
||||
error!("Compaction failed, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Compaction join error, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
};
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.changed() => {
|
||||
trace!("received cancellation request");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction loop stopped. State is {:?}",
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
}
|
||||
|
||||
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
|
||||
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
|
||||
|
||||
/// Spawn a task that will periodically schedule garbage collection until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_GC_LOOP
|
||||
.get()
|
||||
.context("Failed to get START_GC_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.context("Failed to send to START_GC_LOOP channel")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn a task that will periodically schedule compaction until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_COMPACTION_LOOP
|
||||
.get()
|
||||
.context("failed to get START_COMPACTION_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.context("failed to send to START_COMPACTION_LOOP")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn the TenantTaskManager
|
||||
/// This needs to be called before start_gc_loop or start_compaction_loop
|
||||
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("tenant-task-worker")
|
||||
.worker_threads(40) // Way more than necessary
|
||||
.max_blocking_threads(100) // Way more than necessary
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_GC_LOOP
|
||||
.set(gc_send)
|
||||
.expect("Failed to set START_GC_LOOP");
|
||||
|
||||
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_COMPACTION_LOOP
|
||||
.set(compaction_send)
|
||||
.expect("Failed to set START_COMPACTION_LOOP");
|
||||
|
||||
// TODO this is getting repetitive
|
||||
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
|
||||
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::TenantTaskManager,
|
||||
None,
|
||||
None,
|
||||
"Tenant task manager main thread",
|
||||
true,
|
||||
move || {
|
||||
runtime.block_on(async move {
|
||||
let mut futures = FuturesUnordered::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => {
|
||||
// Send cancellation to all tasks
|
||||
for (_, cancel) in gc_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
for (_, cancel) in compaction_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
|
||||
// Exit after all tasks finish
|
||||
while let Some(result) = futures.next().await {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
},
|
||||
Err(e) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
|
||||
error!("loop join error {}", e)
|
||||
},
|
||||
}
|
||||
}
|
||||
break;
|
||||
},
|
||||
tenantid = gc_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
|
||||
|
||||
// Spawn new task, request cancellation of the old one if exists
|
||||
let (cancel_send, cancel_recv) = watch::channel(());
|
||||
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
|
||||
.instrument(trace_span!("gc loop", tenant = %tenantid)));
|
||||
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
|
||||
old_cancel_send.send(()).ok();
|
||||
}
|
||||
|
||||
// Update metrics, remember handle
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
futures.push(handle);
|
||||
},
|
||||
tenantid = compaction_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
|
||||
|
||||
// Spawn new task, request cancellation of the old one if exists
|
||||
let (cancel_send, cancel_recv) = watch::channel(());
|
||||
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
|
||||
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
|
||||
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
|
||||
old_cancel_send.send(()).ok();
|
||||
}
|
||||
|
||||
// Update metrics, remember handle
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
futures.push(handle);
|
||||
},
|
||||
result = futures.next() => {
|
||||
// Log and count any unhandled panics
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
|
||||
error!("loop join error {}", e)
|
||||
},
|
||||
None => {},
|
||||
};
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_period = repo.get_gc_period();
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
if gc_horizon > 0 {
|
||||
match repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) {
|
||||
Ok(_) => return Ok(ControlFlow::Continue(gc_period)),
|
||||
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
|
||||
return Ok(ControlFlow::Break(()))
|
||||
}
|
||||
Err(RepoIoError::Other(e)) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(ControlFlow::Continue(gc_period))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Decide whether to sleep or break
|
||||
let sleep_duration = match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => period,
|
||||
Ok(Ok(ControlFlow::Break(()))) => break,
|
||||
Ok(Err(e)) => {
|
||||
error!("Gc failed, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Gc join error, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
};
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.changed() => {
|
||||
trace!("received cancellation request");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC loop stopped. State is {:?}",
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
}
|
||||
79
pageserver/src/tenant_threads.rs
Normal file
79
pageserver/src/tenant_threads.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use anyhow::Result;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
|
||||
std::thread::sleep(compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.compaction_iteration()?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc thread for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = repo.get_gc_period().as_secs();
|
||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||
{
|
||||
sleep_time -= 1;
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -94,8 +94,11 @@ pub enum ThreadKind {
|
||||
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
|
||||
WalReceiverManager,
|
||||
|
||||
// Thread that schedules new compaction and gc jobs
|
||||
TenantTaskManager,
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
|
||||
// Thread that flushes frozen in-memory layers to disk
|
||||
LayerFlushThread,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
1133
pageserver/src/walreceiver/connection_manager.rs
Normal file
1133
pageserver/src/walreceiver/connection_manager.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,5 @@
|
||||
//! Actual Postgres connection handler to stream WAL to the server.
|
||||
//! Runs as a separate, cancellable Tokio task.
|
||||
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
@@ -10,113 +10,29 @@ use anyhow::{bail, ensure, Context};
|
||||
use bytes::BytesMut;
|
||||
use fail::fail_point;
|
||||
use postgres::{SimpleQueryMessage, SimpleQueryRow};
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use tokio::{pin, select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
pq_proto::ReplicationFeedback,
|
||||
zid::{NodeId, ZTenantTimelineId},
|
||||
};
|
||||
|
||||
use super::TaskEvent;
|
||||
use crate::{
|
||||
http::models::WalReceiverEntry,
|
||||
repository::{Repository, Timeline},
|
||||
tenant_mgr,
|
||||
walingest::WalIngest,
|
||||
};
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum WalConnectionEvent {
|
||||
Started,
|
||||
NewWal(ReplicationFeedback),
|
||||
End(Result<(), String>),
|
||||
}
|
||||
|
||||
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
|
||||
#[derive(Debug)]
|
||||
pub struct WalReceiverConnection {
|
||||
handle: tokio::task::JoinHandle<()>,
|
||||
cancellation: watch::Sender<()>,
|
||||
events_receiver: watch::Receiver<WalConnectionEvent>,
|
||||
}
|
||||
|
||||
impl WalReceiverConnection {
|
||||
/// Initializes the connection task, returning a set of handles on top of it.
|
||||
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
|
||||
pub fn open(
|
||||
id: ZTenantTimelineId,
|
||||
safekeeper_id: NodeId,
|
||||
wal_producer_connstr: String,
|
||||
connect_timeout: Duration,
|
||||
) -> Self {
|
||||
let (cancellation, mut cancellation_receiver) = watch::channel(());
|
||||
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
|
||||
|
||||
let handle = tokio::spawn(
|
||||
async move {
|
||||
let connection_result = handle_walreceiver_connection(
|
||||
id,
|
||||
&wal_producer_connstr,
|
||||
&events_sender,
|
||||
&mut cancellation_receiver,
|
||||
connect_timeout,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Walreceiver connection for id {id} failed with error: {e:#}")
|
||||
});
|
||||
|
||||
match &connection_result {
|
||||
Ok(()) => {
|
||||
debug!("Walreceiver connection for id {id} ended successfully")
|
||||
}
|
||||
Err(e) => warn!("{e}"),
|
||||
}
|
||||
events_sender
|
||||
.send(WalConnectionEvent::End(connection_result))
|
||||
.ok();
|
||||
}
|
||||
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
|
||||
);
|
||||
|
||||
Self {
|
||||
handle,
|
||||
cancellation,
|
||||
events_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls for the next WAL receiver event, if there's any available since the last check.
|
||||
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
|
||||
/// Only the last event is returned, all events received between observatins are lost.
|
||||
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
|
||||
match self.events_receiver.changed().await {
|
||||
Ok(()) => Some(self.events_receiver.borrow().clone()),
|
||||
Err(_cancellation_error) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
|
||||
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
|
||||
self.cancellation.send(()).ok();
|
||||
let handle = &mut self.handle;
|
||||
handle
|
||||
.await
|
||||
.context("Failed to join on a walreceiver connection task")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_walreceiver_connection(
|
||||
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
|
||||
pub async fn handle_walreceiver_connection(
|
||||
id: ZTenantTimelineId,
|
||||
wal_producer_connstr: &str,
|
||||
events_sender: &watch::Sender<WalConnectionEvent>,
|
||||
cancellation: &mut watch::Receiver<()>,
|
||||
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
|
||||
mut cancellation: watch::Receiver<()>,
|
||||
connect_timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
// Connect to the database in replication mode.
|
||||
@@ -214,8 +130,6 @@ async fn handle_walreceiver_connection(
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
// check for shutdown first
|
||||
biased;
|
||||
_ = cancellation.changed() => {
|
||||
info!("walreceiver interrupted");
|
||||
None
|
||||
@@ -344,7 +258,7 @@ async fn handle_walreceiver_connection(
|
||||
.as_mut()
|
||||
.zenith_status_update(data.len() as u64, &data)
|
||||
.await?;
|
||||
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
|
||||
if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) {
|
||||
warn!("Wal connection event listener dropped, aborting the connection: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -49,6 +49,12 @@ impl UserFacingError for ConsoleAuthError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&auth::credentials::ClientCredsParseError> for ConsoleAuthError {
|
||||
fn from(e: &auth::credentials::ClientCredsParseError) -> Self {
|
||||
ConsoleAuthError::BadProjectName(e.clone())
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: convert into an enum with "error"
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct GetRoleSecretResponse {
|
||||
@@ -94,7 +100,7 @@ impl<'a> Api<'a> {
|
||||
let mut url = self.endpoint.clone();
|
||||
url.path_segments_mut().push("proxy_get_role_secret");
|
||||
url.query_pairs_mut()
|
||||
.append_pair("project", &self.creds.project_name)
|
||||
.append_pair("project", self.creds.project_name.as_ref()?)
|
||||
.append_pair("role", &self.creds.user);
|
||||
|
||||
// TODO: use a proper logger
|
||||
@@ -117,8 +123,8 @@ impl<'a> Api<'a> {
|
||||
async fn wake_compute(&self) -> Result<DatabaseInfo> {
|
||||
let mut url = self.endpoint.clone();
|
||||
url.path_segments_mut().push("proxy_wake_compute");
|
||||
url.query_pairs_mut()
|
||||
.append_pair("project", &self.creds.project_name);
|
||||
let project_name = self.creds.project_name.as_ref()?;
|
||||
url.query_pairs_mut().append_pair("project", project_name);
|
||||
|
||||
// TODO: use a proper logger
|
||||
println!("cplane request: {url}");
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::collections::HashMap;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
#[derive(Debug, Error, PartialEq)]
|
||||
#[derive(Debug, Error, PartialEq, Eq, Clone)]
|
||||
pub enum ClientCredsParseError {
|
||||
#[error("Parameter `{0}` is missing in startup packet.")]
|
||||
MissingKey(&'static str),
|
||||
@@ -44,7 +44,7 @@ impl UserFacingError for ClientCredsParseError {}
|
||||
pub struct ClientCredentials {
|
||||
pub user: String,
|
||||
pub dbname: String,
|
||||
pub project_name: String,
|
||||
pub project_name: Result<String, ClientCredsParseError>,
|
||||
}
|
||||
|
||||
impl ClientCredentials {
|
||||
@@ -67,7 +67,7 @@ impl ClientCredentials {
|
||||
let user = get_param("user")?;
|
||||
let dbname = get_param("database")?;
|
||||
let project_name = get_param("project").ok();
|
||||
let project_name = get_project_name(sni_data, common_name, project_name.as_deref())?;
|
||||
let project_name = get_project_name(sni_data, common_name, project_name.as_deref());
|
||||
|
||||
Ok(Self {
|
||||
user,
|
||||
|
||||
@@ -5,6 +5,11 @@ use anyhow::Context;
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use etcd_broker::subscription_value::SkTimelineInfo;
|
||||
use etcd_broker::LeaseKeepAliveStream;
|
||||
use etcd_broker::LeaseKeeper;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use tokio::spawn;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -21,7 +26,7 @@ use utils::zid::{NodeId, ZTenantTimelineId};
|
||||
|
||||
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
const LEASE_TTL_SEC: i64 = 5;
|
||||
const LEASE_TTL_SEC: i64 = 10;
|
||||
|
||||
pub fn thread_main(conf: SafeKeeperConf) {
|
||||
let runtime = runtime::Builder::new_current_thread()
|
||||
@@ -154,13 +159,48 @@ pub fn get_candiate_name(system_id: NodeId) -> String {
|
||||
format!("id_{system_id}")
|
||||
}
|
||||
|
||||
async fn push_sk_info(
|
||||
zttid: ZTenantTimelineId,
|
||||
mut client: Client,
|
||||
key: String,
|
||||
sk_info: SkTimelineInfo,
|
||||
mut lease: Lease,
|
||||
) -> anyhow::Result<(ZTenantTimelineId, Lease)> {
|
||||
let put_opts = PutOptions::new().with_lease(lease.id);
|
||||
client
|
||||
.put(
|
||||
key.clone(),
|
||||
serde_json::to_string(&sk_info)?,
|
||||
Some(put_opts),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to push safekeeper info to {}", key))?;
|
||||
|
||||
// revive the lease
|
||||
lease
|
||||
.keeper
|
||||
.keep_alive()
|
||||
.await
|
||||
.context("failed to send LeaseKeepAliveRequest")?;
|
||||
lease
|
||||
.ka_stream
|
||||
.message()
|
||||
.await
|
||||
.context("failed to receive LeaseKeepAliveResponse")?;
|
||||
|
||||
Ok((zttid, lease))
|
||||
}
|
||||
|
||||
struct Lease {
|
||||
id: i64,
|
||||
keeper: LeaseKeeper,
|
||||
ka_stream: LeaseKeepAliveStream,
|
||||
}
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
|
||||
|
||||
// Get and maintain lease to automatically delete obsolete data
|
||||
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
|
||||
let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?;
|
||||
let mut leases: HashMap<ZTenantTimelineId, Lease> = HashMap::new();
|
||||
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
loop {
|
||||
@@ -168,33 +208,46 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// is under plain mutex. That's ok, all this code is not performance
|
||||
// sensitive and there is no risk of deadlock as we don't await while
|
||||
// lock is held.
|
||||
for zttid in GlobalTimelines::get_active_timelines() {
|
||||
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
|
||||
let sk_info = tli.get_public_info(&conf)?;
|
||||
let put_opts = PutOptions::new().with_lease(lease.id());
|
||||
client
|
||||
.put(
|
||||
timeline_safekeeper_path(
|
||||
conf.broker_etcd_prefix.clone(),
|
||||
zttid,
|
||||
conf.my_id,
|
||||
),
|
||||
serde_json::to_string(&sk_info)?,
|
||||
Some(put_opts),
|
||||
)
|
||||
.await
|
||||
.context("failed to push safekeeper info")?;
|
||||
let active_tlis = GlobalTimelines::get_active_timelines();
|
||||
|
||||
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
|
||||
for zttid in active_tlis.iter() {
|
||||
if let Entry::Vacant(v) = leases.entry(*zttid) {
|
||||
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
|
||||
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
|
||||
v.insert(Lease {
|
||||
id: lease.id(),
|
||||
keeper,
|
||||
ka_stream,
|
||||
});
|
||||
}
|
||||
}
|
||||
// revive the lease
|
||||
keeper
|
||||
.keep_alive()
|
||||
.await
|
||||
.context("failed to send LeaseKeepAliveRequest")?;
|
||||
ka_stream
|
||||
.message()
|
||||
.await
|
||||
.context("failed to receive LeaseKeepAliveResponse")?;
|
||||
leases.retain(|zttid, _| active_tlis.contains(zttid));
|
||||
|
||||
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
|
||||
let handles = active_tlis
|
||||
.iter()
|
||||
.filter_map(|zttid| GlobalTimelines::get_loaded(*zttid))
|
||||
.map(|tli| {
|
||||
let sk_info = tli.get_public_info(&conf);
|
||||
let key = timeline_safekeeper_path(
|
||||
conf.broker_etcd_prefix.clone(),
|
||||
tli.zttid,
|
||||
conf.my_id,
|
||||
);
|
||||
let lease = leases.remove(&tli.zttid).unwrap();
|
||||
tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
for h in handles {
|
||||
let (zttid, lease) = h.await??;
|
||||
// It is ugly to pull leases from hash and then put it back, but
|
||||
// otherwise we have to resort to long living per tli tasks (which
|
||||
// would generate a lot of errors when etcd is down) as task wants to
|
||||
// have 'static objects, we can't borrow to it.
|
||||
leases.insert(zttid, lease);
|
||||
}
|
||||
|
||||
sleep(push_interval).await;
|
||||
}
|
||||
}
|
||||
@@ -221,15 +274,12 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
.await
|
||||
.context("failed to subscribe for safekeeper info")?;
|
||||
loop {
|
||||
match subscription.fetch_data().await {
|
||||
match subscription.value_updates.recv().await {
|
||||
Some(new_info) => {
|
||||
for (zttid, sk_info) in new_info {
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
|
||||
for (safekeeper_id, info) in sk_info {
|
||||
tli.record_safekeeper_info(&info, safekeeper_id).await?
|
||||
}
|
||||
}
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
|
||||
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -239,6 +239,19 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: Peers(vec![]),
|
||||
});
|
||||
} else if version == 5 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
|
||||
if oldstate.timeline_start_lsn != Lsn(0) {
|
||||
return Ok(oldstate);
|
||||
}
|
||||
|
||||
// set special timeline_start_lsn because we don't know the real one
|
||||
info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)");
|
||||
oldstate.timeline_start_lsn = Lsn(1);
|
||||
oldstate.local_start_lsn = Lsn(1);
|
||||
|
||||
return Ok(oldstate);
|
||||
}
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use utils::{
|
||||
};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 5;
|
||||
pub const SK_FORMAT_VERSION: u32 = 6;
|
||||
const SK_PROTOCOL_VERSION: u32 = 2;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use serde::Serialize;
|
||||
use tokio::sync::watch;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::{self};
|
||||
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
@@ -445,9 +445,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Prepare public safekeeper info for reporting.
|
||||
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
|
||||
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
Ok(SkTimelineInfo {
|
||||
SkTimelineInfo {
|
||||
last_log_term: Some(shared_state.sk.get_epoch()),
|
||||
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
@@ -460,7 +460,7 @@ impl Timeline {
|
||||
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
||||
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
|
||||
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Update timeline state with peer safekeeper data.
|
||||
@@ -625,6 +625,8 @@ impl GlobalTimelines {
|
||||
zttid: ZTenantTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
|
||||
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
match state.timelines.get(&zttid) {
|
||||
@@ -667,7 +669,7 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Get ZTenantTimelineIDs of all active timelines.
|
||||
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
|
||||
pub fn get_active_timelines() -> HashSet<ZTenantTimelineId> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state
|
||||
.timelines
|
||||
|
||||
@@ -45,7 +45,7 @@ If you want to run all tests that have the string "bench" in their names:
|
||||
|
||||
Useful environment variables:
|
||||
|
||||
`ZENITH_BIN`: The directory where zenith binaries can be found.
|
||||
`NEON_BIN`: The directory where neon binaries can be found.
|
||||
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
|
||||
`TEST_OUTPUT`: Set the directory where test state and test output files
|
||||
should go.
|
||||
|
||||
@@ -35,9 +35,14 @@ def test_createdb(neon_simple_env: NeonEnv):
|
||||
with closing(db.connect(dbname='foodb')) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check database size in both branches
|
||||
cur.execute(
|
||||
'select pg_size_pretty(pg_database_size(%s)), pg_size_pretty(sum(pg_relation_size(oid))) from pg_class where relisshared is false;',
|
||||
('foodb', ))
|
||||
cur.execute("""
|
||||
select pg_size_pretty(pg_database_size('foodb')),
|
||||
pg_size_pretty(
|
||||
sum(pg_relation_size(oid, 'main'))
|
||||
+sum(pg_relation_size(oid, 'vm'))
|
||||
+sum(pg_relation_size(oid, 'fsm'))
|
||||
) FROM pg_class where relisshared is false
|
||||
""")
|
||||
res = cur.fetchone()
|
||||
# check that dbsize equals sum of all relation sizes, excluding shared ones
|
||||
# This is how we define dbsize in neon for now
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# It's possible to run any regular test with the local fs remote storage via
|
||||
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" poetry ......
|
||||
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import shutil, os
|
||||
from contextlib import closing
|
||||
|
||||
@@ -8,7 +8,6 @@ import time
|
||||
|
||||
def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
@@ -23,7 +22,6 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW neon.timeline_id")
|
||||
|
||||
# Create table, and insert the first 100 rows
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
cur.execute("""
|
||||
INSERT INTO foo
|
||||
@@ -43,6 +41,51 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
"current_logical_size_non_incremental"]
|
||||
|
||||
|
||||
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
timeline_details = assert_local(client, env.initial_tenant, new_timeline_id)
|
||||
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
|
||||
'current_logical_size_non_incremental']
|
||||
|
||||
pgmain = env.postgres.create_start("test_timeline_size")
|
||||
log.info("postgres is running on 'test_timeline_size' branch")
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW neon.timeline_id")
|
||||
|
||||
res = assert_local(client, env.initial_tenant, new_timeline_id)
|
||||
local_details = res['local']
|
||||
assert local_details["current_logical_size"] == local_details[
|
||||
"current_logical_size_non_incremental"]
|
||||
|
||||
cur.execute('CREATE DATABASE foodb')
|
||||
with closing(pgmain.connect(dbname='foodb')) as conn:
|
||||
with conn.cursor() as cur2:
|
||||
|
||||
cur2.execute("CREATE TABLE foo (t text)")
|
||||
cur2.execute("""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 10) g
|
||||
""")
|
||||
|
||||
res = assert_local(client, env.initial_tenant, new_timeline_id)
|
||||
local_details = res['local']
|
||||
assert local_details["current_logical_size"] == local_details[
|
||||
"current_logical_size_non_incremental"]
|
||||
|
||||
cur.execute('DROP DATABASE foodb')
|
||||
|
||||
res = assert_local(client, env.initial_tenant, new_timeline_id)
|
||||
local_details = res['local']
|
||||
assert local_details["current_logical_size"] == local_details[
|
||||
"current_logical_size_non_incremental"]
|
||||
|
||||
|
||||
# wait until received_lsn_lag is 0
|
||||
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
|
||||
started_at = time.time()
|
||||
|
||||
@@ -50,7 +50,7 @@ A fixture is created with the decorator @pytest.fixture decorator.
|
||||
See docs: https://docs.pytest.org/en/6.2.x/fixture.html
|
||||
|
||||
There are several environment variables that can control the running of tests:
|
||||
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
|
||||
NEON_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
|
||||
|
||||
There's no need to import this file to use it. It should be declared as a plugin
|
||||
inside conftest.py, and that makes it available to all tests.
|
||||
@@ -151,7 +151,7 @@ def pytest_configure(config):
|
||||
return
|
||||
# Find the neon binaries.
|
||||
global neon_binpath
|
||||
env_neon_bin = os.environ.get('ZENITH_BIN')
|
||||
env_neon_bin = os.environ.get('NEON_BIN')
|
||||
if env_neon_bin:
|
||||
neon_binpath = env_neon_bin
|
||||
else:
|
||||
|
||||
@@ -80,6 +80,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
|
||||
thread.join()
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("n_tables", [5])
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(5))
|
||||
@pytest.mark.parametrize("num_iters", [10])
|
||||
@@ -121,6 +122,7 @@ def start_pgbench_simple_update_workload(env: PgCompare, duration: int):
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(100))
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, duration: int):
|
||||
@@ -158,6 +160,7 @@ def start_pgbench_intensive_initialization(env: PgCompare, scale: int):
|
||||
])
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(1000))
|
||||
def test_pgbench_intensive_init_workload(pg_compare: PgCompare, scale: int):
|
||||
env = pg_compare
|
||||
|
||||
Reference in New Issue
Block a user