Compare commits

..

28 Commits

Author SHA1 Message Date
Bojan Serafimov
802ef85cfb Add repo state 2022-06-28 10:34:37 -04:00
Bojan Serafimov
2b4c3cb932 Handle errors on shutdown 2022-06-28 09:29:59 -04:00
Bojan Serafimov
bf76f43ea4 Wait for tasks to complete 2022-06-27 17:54:31 -04:00
Bojan Serafimov
98062865f4 Handle errors inside loop 2022-06-24 17:08:44 -04:00
Bojan Serafimov
cdc81996b4 Add metrics 2022-06-24 15:28:53 -04:00
Bojan Serafimov
1169e9ea4c Rename threads to tasks 2022-06-24 11:08:55 -04:00
Bojan Serafimov
3a23869780 Log errors 2022-06-24 11:05:07 -04:00
Bojan Serafimov
796ee4d8af Instrument the task, not the await 2022-06-24 10:29:42 -04:00
Bojan Serafimov
b31ce411d2 Remove unnecessary map_err 2022-06-24 09:50:14 -04:00
Bojan Serafimov
24a5bd10a0 Add cancellation 2022-06-23 23:25:48 -04:00
Bojan Serafimov
2c029d9803 Remove redundant error context 2022-06-23 22:32:15 -04:00
Bojan Serafimov
763b00ccee Merge branch 'tenant-tasks' of github.com:neondatabase/neon into tenant-tasks 2022-06-23 14:15:14 -04:00
Bojan Serafimov
c44c8a0ea0 Add comment 2022-06-23 14:14:39 -04:00
Bojan Serafimov
692496d733 Cancel tasks 2022-06-23 13:20:53 -04:00
Bojan Serafimov
0f4552a544 Update TODO 2022-06-23 12:59:20 -04:00
Bojan Serafimov
d7d4cc8c77 Error instead of panic 2022-06-23 12:56:21 -04:00
Bojan Serafimov
9aab1d0f2b Expand blocking scope 2022-06-22 16:04:42 -04:00
Bojan Serafimov
83dc93ab0f Merge branch 'main' into tenant-tasks 2022-06-22 14:49:45 -04:00
Bojan Serafimov
9a9a58d52c Fmt 2022-06-15 09:49:07 -04:00
Bojan Serafimov
865e8740a7 Add docs 2022-06-15 09:35:41 -04:00
Bojan Serafimov
1a5d1a15d0 Use tokio sleep instead 2022-06-15 09:33:56 -04:00
Bojan Serafimov
02a9883f0f Add TenantTaskManager 2022-06-15 09:24:46 -04:00
Bojan Serafimov
ee36ca54d5 Run compaction as task 2022-06-15 09:16:57 -04:00
Bojan Serafimov
36cc6d2928 Fmt 2022-06-14 15:07:36 -04:00
Bojan Serafimov
e1a4c06918 Fix init 2022-06-14 15:05:30 -04:00
Bojan Serafimov
ec4528505e Simplify 2022-06-14 12:38:09 -04:00
Bojan Serafimov
c79e72e835 Add runtime 2022-06-14 11:08:17 -04:00
Bojan Serafimov
a1f85715ac WIP 2022-06-14 09:39:55 -04:00
46 changed files with 1862 additions and 2306 deletions

View File

@@ -1,7 +1,6 @@
[pageservers]
#zenith-us-stage-ps-1 console_region_id=27
zenith-us-stage-ps-2 console_region_id=27
zenith-us-stage-ps-3 console_region_id=27
[safekeepers]
zenith-us-stage-sk-4 console_region_id=27

View File

@@ -100,8 +100,10 @@ jobs:
name: Rust build << parameters.build_type >>
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
@@ -110,7 +112,7 @@ jobs:
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- save_cache:
@@ -126,24 +128,32 @@ jobs:
name: cargo test
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
cargo test $CARGO_FLAGS
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
# Install the rust binaries, for use by test jobs
- run:
name: Install rust binaries
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
binaries=$(
cargo metadata --format-version=1 --no-deps |
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
test_exe_paths=$(
cargo test --message-format=json --no-run |
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
@@ -156,15 +166,34 @@ jobs:
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/zenith/bin/$bin
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
# Install test executables (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/zenith/test_bin/$(basename $bin)
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
fi
# Install the postgres binaries, for use by test jobs
- run:
name: Install postgres binaries
command: |
cp -a tmp_install /tmp/zenith/pg_install
# Save rust binaries for other jobs in the workflow
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save the rust binaries and coverage data for other jobs in this workflow.
- persist_to_workspace:
root: /tmp/zenith
paths:
@@ -257,7 +286,7 @@ jobs:
# no_output_timeout, specified here.
no_output_timeout: 10m
environment:
- NEON_BIN: /tmp/zenith/bin
- ZENITH_BIN: /tmp/zenith/bin
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
@@ -285,6 +314,12 @@ jobs:
export GITHUB_SHA=$CIRCLE_SHA1
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
# Run the tests.
#
# The junit.xml file allows CircleCI to display more fine-grained test information
@@ -295,7 +330,7 @@ jobs:
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
./scripts/pytest \
"${cov_prefix[@]}" ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
@@ -324,12 +359,67 @@ jobs:
# The store_test_results step tells CircleCI where to find the junit.xml file.
- store_test_results:
path: /tmp/test_output
# Save data (if any)
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save coverage data (if any)
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
coverage-report:
executor: neon-xlarge-executor
steps:
- attach_workspace:
at: /tmp/zenith
- checkout
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
- run:
name: Build coverage report
command: |
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/coverage \
--dir=/tmp/zenith/coverage report \
--input-objects=/tmp/zenith/etc/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
- run:
name: Upload coverage report
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/git-upload \
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"success\",
\"context\": \"zenith-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
# Build neondatabase/neon:latest image and push it to Docker hub
docker-image:
docker:
@@ -598,6 +688,50 @@ jobs:
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
# Trigger a new remote CI job
remote-ci-trigger:
docker:
- image: cimg/base:2021.04
parameters:
remote_repo:
type: string
environment:
REMOTE_REPO: << parameters.remote_repo >>
steps:
- run:
name: Set PR's status to pending
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
- run:
name: Request a remote CI test
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$CIRCLE_SHA1\",
\"remote_repo\": \"$LOCAL_REPO\"
}
}"
workflows:
build_and_test:
jobs:
@@ -640,6 +774,12 @@ workflows:
save_perf_report: true
requires:
- build-neon-release
- coverage-report:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
requires:
# TODO: consider adding more
- other-tests-debug
- docker-image:
# Context gives an ability to login
context: Docker Hub
@@ -740,3 +880,14 @@ workflows:
- release
requires:
- docker-image-release
- remote-ci-trigger:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
remote_repo: "neondatabase/cloud"
requires:
# XXX: Successful build doesn't mean everything is OK, but
# the job to be triggered takes so much time to complete (~22 min)
# that it's better not to wait for the commented-out steps
- build-neon-release
# - pg_regress-tests-release
# - other-tests-release

View File

@@ -2,29 +2,25 @@ name: 'Run python test'
description: 'Runs a Neon python test set, performing all the required preparations before'
inputs:
# Select the type of Rust build. Must be "release" or "debug".
build_type:
description: 'Type of Rust (neon) and C (postgres) builds. Must be "release" or "debug".'
required: true
rust_toolchain:
description: 'Rust toolchain version to fetch the caches'
required: true
# This parameter is required, to prevent the mistake of running all tests in one job.
test_selection:
description: 'A python test suite to run'
required: true
# Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr
extra_params:
description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr'
required: false
default: ''
needs_postgres_source:
description: 'Set to true if the test suite requires postgres source checked out'
required: false
default: 'false'
run_in_parallel:
description: 'Whether to run tests in parallel'
required: false
default: 'true'
save_perf_report:
description: 'Whether to upload the performance report'
required: false
default: 'false'
@@ -64,7 +60,7 @@ runs:
- name: Run pytest
env:
NEON_BIN: /tmp/neon/bin
ZENITH_BIN: /tmp/neon/bin
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
@@ -92,7 +88,7 @@ runs:
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
fi
@@ -121,20 +117,3 @@ runs:
scripts/generate_and_push_perf_report.sh
fi
fi
- name: Delete all data but logs
shell: bash -ex {0}
if: always()
run: |
du -sh /tmp/test_output/*
find /tmp/test_output -type f ! -name "*.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" ! -name "*.metrics" -delete
du -sh /tmp/test_output/*
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v3
with:
retention-days: 7
if-no-files-found: error
name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs
path: /tmp/test_output/

View File

@@ -1,17 +0,0 @@
name: 'Merge and upload coverage data'
description: 'Compresses and uploads the coverage data as an artifact'
runs:
using: "composite"
steps:
- name: Merge coverage data
shell: bash -ex {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Upload coverage data
uses: actions/upload-artifact@v3
with:
retention-days: 7
if-no-files-found: error
name: coverage-data-artifact
path: /tmp/coverage/

View File

@@ -1,28 +1,13 @@
name: Test
on:
push:
branches:
- main
pull_request:
name: build_and_test
on: [ push ]
defaults:
run:
shell: bash -ex {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
build-postgres:
runs-on: [ self-hosted, Linux, k8s-runner ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -49,7 +34,7 @@ jobs:
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: mold -run make postgres -j$(nproc)
run: COPT='-Werror' mold -run make postgres -j$(nproc)
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
- name: Prepare postgres artifact
@@ -67,7 +52,6 @@ jobs:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-postgres ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -101,39 +85,44 @@ jobs:
~/.cargo/registry/
~/.cargo/git/
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Run cargo build
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
export CACHEPOT_BUCKET=zenith-rust-cachepot
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${{ secrets.AWS_ACCESS_KEY_ID }}"
export AWS_SECRET_ACCESS_KEY="${{ secrets.AWS_SECRET_ACCESS_KEY }}"
export HOME=/home/runner
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- name: Run cargo test
run: |
export HOME=/home/runner
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
- name: Install rust binaries
run: |
export HOME=/home/runner
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
@@ -148,36 +137,39 @@ jobs:
jq -r '.executable | select(. != null)'
)
mkdir -p /tmp/neon/bin/
mkdir -p /tmp/neon/test_bin/
mkdir -p /tmp/neon/etc/
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
mkdir -p /tmp/neon/bin
mkdir -p /tmp/neon/test_bin
mkdir -p /tmp/neon/etc
# Install target binaries
for bin in $binaries; do
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/neon/bin/$bin
cp "$SRC" "$DST"
cp $SRC $DST
echo $DST >> /tmp/neon/etc/binaries.list
done
# Install test executables and write list of all binaries (for code coverage)
# Install test executables (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $binaries; do
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
done
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/neon/test_bin/$(basename $bin)
cp "$SRC" "$DST"
echo "$DST" >> /tmp/coverage/binaries.list
cp $SRC $DST
echo $DST >> /tmp/neon/etc/binaries.list
done
fi
- name: Install postgres binaries
run: cp -a tmp_install /tmp/neon/pg_install
- name: Merge coverage data
run: |
export HOME=/home/runner
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage merge
fi
- name: Prepare neon artifact
run: tar -C /tmp/neon/ -czf ./neon.tgz .
@@ -189,17 +181,38 @@ jobs:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon.tgz
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
check-codestyle-python:
runs-on: [ self-hosted, Linux, k8s-runner ]
strategy:
matrix:
rust_toolchain: [ 1.58 ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: Run yapf to ensure code format
run: poetry run yapf --recursive --diff .
- name: Run mypy to check types
run: poetry run mypy .
pg_regress-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -218,15 +231,10 @@ jobs:
test_selection: batch_pg_regress
needs_postgres_source: true
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
other-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -244,15 +252,10 @@ jobs:
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: batch_others
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
benchmarks:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ release ]
rust_toolchain: [ 1.58 ]
@@ -270,120 +273,4 @@ jobs:
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: performance
run_in_parallel: false
save_perf_report: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
coverage-report:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ other-tests, pg_regress-tests ]
strategy:
fail-fast: false
matrix:
build_type: [ debug ]
rust_toolchain: [ 1.58 ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Restore cargo deps cache
id: cache_cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/registry/
~/.cargo/git/
target/
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
with:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon-artifact/
- name: Extract Neon artifact
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Restore coverage data
uses: actions/download-artifact@v3
with:
name: coverage-data-artifact
path: /tmp/coverage/
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Build and upload coverage report
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA
scripts/git-upload \
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"success\",
\"context\": \"neon-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
trigger-e2e-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"
# save_perf_report: true

View File

@@ -1,4 +1,4 @@
name: Check code style and build
name: Build and Test
on:
push:
@@ -6,27 +6,15 @@ on:
- main
pull_request:
defaults:
run:
shell: bash -ex {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
jobs:
check-codestyle-rust:
regression-check:
strategy:
fail-fast: false
matrix:
# If we want to duplicate this job for different
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
rust_toolchain: [1.58]
os: [ubuntu-latest, macos-latest]
timeout-minutes: 50
timeout-minutes: 30
name: run regression test suite
runs-on: ${{ matrix.os }}
@@ -104,30 +92,5 @@ jobs:
- name: Run cargo clippy
run: ./run_clippy.sh
- name: Ensure all project builds
run: cargo build --all --all-targets
check-codestyle-python:
runs-on: [ self-hosted, Linux, k8s-runner ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: false
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: Run yapf to ensure code format
run: poetry run yapf --recursive --diff .
- name: Run mypy to check types
run: poetry run mypy .
- name: Run cargo test
run: cargo test --all --all-targets

7
Cargo.lock generated
View File

@@ -461,7 +461,6 @@ dependencies = [
"tar",
"tokio",
"tokio-postgres",
"urlencoding",
"workspace_hack",
]
@@ -3685,12 +3684,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "urlencoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
[[package]]
name = "utils"
version = "0.1.0"

View File

@@ -1,5 +1,5 @@
# Build Postgres
FROM neondatabase/rust:1.58 AS pg-build
FROM zimg/rust:1.58 AS pg-build
WORKDIR /pg
USER root
@@ -14,7 +14,7 @@ RUN set -e \
&& tar -C tmp_install -czf /postgres_install.tar.gz .
# Build zenith binaries
FROM neondatabase/rust:1.58 AS build
FROM zimg/rust:1.58 AS build
ARG GIT_VERSION=local
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
@@ -46,9 +46,9 @@ RUN set -e \
&& useradd -d /data zenith \
&& chown -R zenith:zenith /data
COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/proxy /usr/local/bin
COPY --from=pg-build /pg/tmp_install/ /usr/local/
COPY --from=pg-build /postgres_install.tar.gz /data/

View File

@@ -1,6 +1,6 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .circle/config.yml
FROM neondatabase/rust:1.58 AS rust-build
FROM zimg/rust:1.58 AS rust-build
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
@@ -15,4 +15,4 @@ RUN set -e \
# Final image that only has one binary
FROM debian:buster-slim
COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl
COPY --from=rust-build /home/circleci/project/target/release/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -53,7 +53,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
1. Install XCode and dependencies
```
xcode-select --install
brew install protobuf etcd openssl
brew install protobuf etcd
```
2. [Install Rust](https://www.rust-lang.org/tools/install)

View File

@@ -18,5 +18,4 @@ serde_json = "1"
tar = "0.4"
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
urlencoding = "2.1.0"
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -289,7 +289,6 @@ impl ComputeNode {
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
handle_grants(&self.spec, &mut client)?;
create_writablity_check_data(&mut client)?;

View File

@@ -2,11 +2,9 @@ use std::path::Path;
use anyhow::Result;
use log::{info, log_enabled, warn, Level};
use postgres::{Client, NoTls};
use postgres::Client;
use serde::Deserialize;
use urlencoding::encode;
use crate::compute::ComputeNode;
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -99,13 +97,18 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
// Process delta operations first
if let Some(ops) = &spec.delta_operations {
info!("processing role renames");
info!("processing delta operations on roles");
for op in ops {
match op.action.as_ref() {
// We do not check either role exists or not,
// Postgres will take care of it for us
"delete_role" => {
// no-op now, roles will be deleted at the end of configuration
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
warn!("deleting role '{}'", &op.name);
xact.execute(query.as_str(), &[])?;
}
// Renaming role drops its password, since role name is
// Renaming role drops its password, since tole name is
// used as a salt there. It is important that this role
// is recorded with a new `name` in the `roles` list.
// Follow up roles update will set the new password.
@@ -179,7 +182,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"GRANT pg_read_all_data, pg_write_all_data TO {}",
"grant pg_read_all_data, pg_write_all_data to {}",
name.quote()
);
xact.execute(grant_query.as_str(), &[])?;
@@ -194,68 +197,6 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
Ok(())
}
/// Reassign all dependent objects and delete requested roles.
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;
// First, reassign all dependent objects to db owners.
if let Some(ops) = &spec.delta_operations {
info!("reassigning dependent objects of to-be-deleted roles");
for op in ops {
if op.action == "delete_role" {
reassign_owned_objects(node, &op.name)?;
}
}
}
// Second, proceed with role deletions.
let mut xact = client.transaction()?;
if let Some(ops) = &spec.delta_operations {
info!("processing role deletions");
for op in ops {
// We do not check either role exists or not,
// Postgres will take care of it for us
if op.action == "delete_role" {
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
warn!("deleting role '{}'", &op.name);
xact.execute(query.as_str(), &[])?;
}
}
}
Ok(())
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
if db.owner != *role_name {
let db_name_encoded = format!("/{}", encode(&db.name));
let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1);
let mut client = Client::connect(&db_connstr, NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.quote(),
db.owner.quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name, &db.name, &db.owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.quote());
client.simple_query(&drop_query)?;
}
}
Ok(())
}
/// It follows mostly the same logic as `handle_roles()` excepting that we
/// does not use an explicit transactions block, since major database operations
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
@@ -353,26 +294,13 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
pub fn handle_grants(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
// via the web interface and proxy link auth. And also we grant a
// read / write all data privilege to every role. So also grant
// create to everyone.
// XXX: later we should stop messing with Postgres ACL in such horrible
// ways.
let roles = spec
.cluster
.roles
.iter()
.map(|r| r.name.quote())
.collect::<Vec<_>>();
for db in &spec.cluster.databases {
let dbname = &db.name;
let query: String = format!(
"GRANT CREATE ON DATABASE {} TO {}",
dbname.quote(),
roles.join(", ")
db.owner.quote()
);
info!("grant query {}", &query);

View File

@@ -6,13 +6,17 @@ pub mod subscription_key;
/// All broker values, possible to use when dealing with etcd.
pub mod subscription_value;
use std::str::FromStr;
use std::{
collections::{hash_map, HashMap},
str::FromStr,
};
use serde::de::DeserializeOwned;
use subscription_key::SubscriptionKey;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::zid::{NodeId, ZTenantTimelineId};
use crate::subscription_key::SubscriptionFullKey;
@@ -24,17 +28,18 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
/// A way to control the data retrieval from a certain subscription.
pub struct BrokerSubscription<V> {
/// An unbounded channel to fetch the relevant etcd updates from.
pub value_updates: mpsc::UnboundedReceiver<BrokerUpdate<V>>,
value_updates: mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>>,
key: SubscriptionKey,
/// A subscription task handle, to allow waiting on it for the task to complete.
/// Both the updates channel and the handle require `&mut`, so it's better to keep
/// both `pub` to allow using both in the same structures without borrow checker complaining.
pub watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl<V> BrokerSubscription<V> {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(&mut self) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>> {
self.value_updates.recv().await
}
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
pub async fn cancel(mut self) -> Result<(), BrokerError> {
self.watcher.cancel().await.map_err(|e| {
@@ -43,41 +48,15 @@ impl<V> BrokerSubscription<V> {
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
)
})?;
match (&mut self.watcher_handle).await {
Ok(res) => res,
Err(e) => {
if e.is_cancelled() {
// don't error on the tasks that are cancelled already
Ok(())
} else {
Err(BrokerError::InternalError(format!(
"Panicked during broker subscription task, kind: {:?}, error: {e}",
self.key
)))
}
}
}
self.watcher_handle.await.map_err(|e| {
BrokerError::InternalError(format!(
"Failed to join the broker value updates task, kind: {:?}, error: {e}",
self.key
))
})?
}
}
impl<V> Drop for BrokerSubscription<V> {
fn drop(&mut self) {
// we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped,
// no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task.
self.watcher_handle.abort();
}
}
/// An update from the etcd broker.
pub struct BrokerUpdate<V> {
/// Etcd generation version, the bigger the more actual the data is.
pub etcd_version: i64,
/// Etcd key for the corresponding value, parsed from the broker KV.
pub key: SubscriptionFullKey,
/// Current etcd value, parsed from the broker KV.
pub value: V,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
@@ -145,21 +124,41 @@ where
break;
}
let mut value_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, V>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut value_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate {
etcd_version: new_etcd_kv.version(),
key,
value,
}) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
},
Ok(Some((key, value))) => match value_updates
.entry(key.id)
.or_default()
.entry(key.node_id)
{
hash_map::Entry::Occupied(mut o) => {
let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
}
}
hash_map::Entry::Vacant(v) => {
v.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
}
},
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
@@ -167,6 +166,13 @@ where
}
}
}
if !value_updates.is_empty() {
if let Err(e) = value_updates_sender.send(value_updates) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
}
}
}
Ok(())

View File

@@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
pageserver::tenant_tasks::init_tenant_task_pool()?;
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,

View File

@@ -16,7 +16,6 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
@@ -39,7 +38,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
// Import all but pg_wal
@@ -62,7 +61,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// We're done importing all the data files.
modification.commit(lsn)?;
modification.commit()?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -268,11 +267,9 @@ fn import_wal<R: Repository>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
nrecords += 1;
@@ -302,7 +299,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -333,7 +330,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit(base_lsn)?;
modification.commit()?;
Ok(())
}
@@ -385,11 +382,9 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -39,7 +39,8 @@ use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
GcResult, RepoIoError, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
TimelineWriter,
};
use crate::repository::{Key, Value};
use crate::tenant_mgr;
@@ -158,6 +159,11 @@ pub struct LayeredRepository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
// Freezing the repo disallows any writes to its directory.
//
// Any writers must hold frozen.try_read() while writing.
pub frozen: RwLock<bool>,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
@@ -319,19 +325,38 @@ impl Repository for LayeredRepository {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
) -> Result<GcResult, RepoIoError> {
let timeline_str = target_timelineid
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
// Make sure repo is not frozen
let guard = match self.frozen.try_read() {
Ok(g) => g,
Err(_) => return Err(RepoIoError::RepoFreezingError),
};
if *guard {
return Err(RepoIoError::RepoFrozenError);
}
STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
})
.map_err(|err| err.into())
}
fn compaction_iteration(&self) -> Result<()> {
fn compaction_iteration(&self) -> Result<(), RepoIoError> {
// Make sure repo is not frozen
let guard = match self.frozen.try_read() {
Ok(g) => g,
Err(_) => return Err(RepoIoError::RepoFreezingError),
};
if *guard {
return Err(RepoIoError::RepoFrozenError);
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -685,6 +710,7 @@ impl LayeredRepository {
) -> LayeredRepository {
LayeredRepository {
tenant_id,
frozen: RwLock::new(false),
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),

View File

@@ -660,21 +660,11 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
pub fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
will_init: bool,
) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?;
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let blob_ref = BlobRef::new(off, will_init);
let blob_ref = BlobRef::new(off, val.will_init());
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;

View File

@@ -28,7 +28,7 @@ use utils::{
use std::fmt::Write as _;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Mutex, RwLock};
use std::sync::RwLock;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -41,10 +41,6 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
/// This buffer is reused for each serialization to avoid additional malloc calls.
ser_buffer: Mutex<Vec<u8>>,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -259,7 +255,6 @@ impl InMemoryLayer {
timelineid,
tenantid,
start_lsn,
ser_buffer: Mutex::new(Vec::new()),
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
index: HashMap::new(),
@@ -275,15 +270,10 @@ impl InMemoryLayer {
pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
let off = {
let mut buf = self.ser_buffer.lock().unwrap();
val.ser_into(&mut (*buf))?;
let off = inner.file.write_blob(&buf)?;
buf.clear();
off
};
let off = inner.file.write_blob(&Value::ser(&val)?)?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
@@ -352,8 +342,8 @@ impl InMemoryLayer {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(key, *lsn, val)?;
}
}

View File

@@ -13,7 +13,7 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod tenant_tasks;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -733,10 +733,17 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let total_blocks =
timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let all_rels = timeline.list_rels(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let mut total_blocks: i64 = 0;
let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64;
for rel in all_rels {
if rel.forknum == 0 {
let n_blocks = timeline.get_rel_size(rel, lsn).unwrap_or(0);
total_blocks += n_blocks as i64;
}
}
let db_size = total_blocks * pg_constants::BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
db_size,

View File

@@ -79,25 +79,23 @@ impl<R: Repository> DatadirTimeline<R> {
/// the timeline.
///
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically.
/// of modifications atomically, all stamped with one LSN.
///
/// To ingest a WAL record, call begin_modification() to get a
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
pub fn begin_modification(&self) -> DatadirModification<R> {
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
DatadirModification {
tline: self,
lsn,
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
@@ -125,19 +123,6 @@ impl<R: Repository> DatadirTimeline<R> {
self.tline.get(key, lsn)
}
// Get size of a database in blocks
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, lsn)?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, lsn)?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
}
/// Get size of a relation file
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
@@ -534,6 +519,8 @@ pub struct DatadirModification<'a, R: Repository> {
/// in the state in 'tline' yet.
pub tline: &'a DatadirTimeline<R>,
lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
@@ -680,10 +667,6 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
}
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
let req_lsn = self.tline.get_last_record_lsn();
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn)?;
// Remove entry from dbdir
let buf = self.get(DBDIR_KEY)?;
let mut dir = DbDirectory::des(&buf)?;
@@ -697,8 +680,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
);
}
// Update logical database size.
self.pending_nblocks -= total_blocks as isize;
// FIXME: update pending_nblocks
// Delete all relations and metadata files for the spcnode/dnode
self.delete(dbdir_key_range(spcnode, dbnode));
@@ -904,22 +886,20 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
///
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
pub fn commit(self) -> Result<()> {
let writer = self.tline.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, value)?;
for (key, value) in self.pending_updates {
writer.put(key, self.lsn, value)?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
for key_range in self.pending_deletions {
writer.delete(key_range.clone(), self.lsn)?;
}
writer.finish_write(lsn);
writer.finish_write(self.lsn);
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
@@ -1347,9 +1327,9 @@ pub fn create_test_timeline<R: Repository>(
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit(Lsn(8))?;
m.commit()?;
Ok(Arc::new(tline))
}

View File

@@ -196,6 +196,19 @@ impl Display for TimelineSyncStatusUpdate {
}
}
#[derive(Debug, thiserror::Error)]
pub enum RepoIoError {
#[error("Cannot write to repo path while repo is frozen")]
RepoFrozenError,
#[error("Cannot write to repo path while repo is being frozen")]
RepoFreezingError,
/// Unstructured anyhow error
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
@@ -252,13 +265,13 @@ pub trait Repository: Send + Sync {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
) -> Result<GcResult, RepoIoError>;
/// Perform one compaction iteration.
/// This function is periodically called by compactor thread.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
fn compaction_iteration(&self) -> Result<()>;
fn compaction_iteration(&self) -> Result<(), RepoIoError>;
/// detaches timeline-related in-memory data.
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000;
}
/// Per-tenant configuration options

View File

@@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() {
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -330,44 +328,15 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
);
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
// Unfreeze the repo, allowing gc/compaction jobs to run
let repo = get_repository_for_tenant(tenant_id)?;
*repo.frozen.write().unwrap() = false;
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Spawn gc and compaction loops. The loops will shut themselves
// down when tenant becomes idle and repo becomes frozen.
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
crate::tenant_tasks::start_gc_loop(tenant_id)?;
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
@@ -379,8 +348,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
// Freeze the repo, waiting for existing gc/compaction to finish
let repo = get_repository_for_tenant(tenant_id)?;
*repo.frozen.write().unwrap() = true;
}
}

View File

@@ -0,0 +1,271 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::{RepoIoError, Repository};
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::*;
use utils::zid::ZTenantId;
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tenant_task_events",
"Number of task start/stop/fail events.",
&["event"],
)
.expect("Failed to register tenant_task_events metric")
});
///
/// Compaction task's main loop
///
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
match repo.compaction_iteration() {
Ok(_) => Ok(ControlFlow::Continue(compaction_period)),
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
Ok(ControlFlow::Break(()))
}
Err(RepoIoError::Other(e)) => Err(e),
}
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Compaction failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Compaction join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
/// Spawn a task that will periodically schedule garbage collection until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_GC_LOOP
.get()
.context("Failed to get START_GC_LOOP")?
.blocking_send(tenantid)
.context("Failed to send to START_GC_LOOP channel")?;
Ok(())
}
/// Spawn a task that will periodically schedule compaction until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_COMPACTION_LOOP
.get()
.context("failed to get START_COMPACTION_LOOP")?
.blocking_send(tenantid)
.context("failed to send to START_COMPACTION_LOOP")?;
Ok(())
}
/// Spawn the TenantTaskManager
/// This needs to be called before start_gc_loop or start_compaction_loop
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
START_GC_LOOP
.set(gc_send)
.expect("Failed to set START_GC_LOOP");
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
START_COMPACTION_LOOP
.set(compaction_send)
.expect("Failed to set START_COMPACTION_LOOP");
// TODO this is getting repetitive
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
thread_mgr::spawn(
ThreadKind::TenantTaskManager,
None,
None,
"Tenant task manager main thread",
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
// Send cancellation to all tasks
for (_, cancel) in gc_loops.drain() {
cancel.send(()).ok();
}
for (_, cancel) in compaction_loops.drain() {
cancel.send(()).ok();
}
// Exit after all tasks finish
while let Some(result) = futures.next().await {
match result {
Ok(()) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Err(e) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
}
}
break;
},
tenantid = gc_recv.recv() => {
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(trace_span!("gc loop", tenant = %tenantid)));
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
result = futures.next() => {
// Log and count any unhandled panics
match result {
Some(Ok(())) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Some(Err(e)) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
None => {},
};
},
}
}
});
Ok(())
},
)?;
Ok(())
}
///
/// GC task's main loop
///
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_period = repo.get_gc_period();
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
match repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) {
Ok(_) => return Ok(ControlFlow::Continue(gc_period)),
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
return Ok(ControlFlow::Break(()))
}
Err(RepoIoError::Other(e)) => return Err(e),
}
}
Ok(ControlFlow::Continue(gc_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Gc failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Gc join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -94,11 +94,8 @@ pub enum ThreadKind {
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that handles GC of a tenant
GarbageCollector,
// Thread that schedules new compaction and gc jobs
TenantTaskManager,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,

View File

@@ -78,13 +78,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
///
pub fn ingest_record(
&mut self,
timeline: &DatadirTimeline<R>,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<R>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
let mut modification = timeline.begin_modification(lsn);
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -98,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, modification, decoded)?;
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -106,19 +106,19 @@ impl<'a, R: Repository> WalIngest<'a, R> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create)?;
self.ingest_xlog_smgr_create(&mut modification, &create)?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
@@ -137,7 +137,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::Clog,
segno,
rpageno,
@@ -146,7 +146,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec)?;
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
@@ -154,7 +154,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&mut modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
)?;
@@ -164,7 +164,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&mut modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
)?;
@@ -187,7 +187,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
@@ -198,7 +198,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
@@ -206,14 +206,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec)?;
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded)?;
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -248,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository
@@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit(lsn)?;
modification.commit()?;
Ok(())
}
@@ -1069,10 +1069,10 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit(Lsn(0x10))?;
m.commit()?;
let walingest = WalIngest::new(tline, Lsn(0x10))?;
Ok(walingest)
@@ -1084,19 +1084,19 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit(Lsn(0x20))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
m.commit(Lsn(0x30))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
m.commit(Lsn(0x40))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit(Lsn(0x50))?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1142,9 +1142,9 @@ mod tests {
);
// Truncate last block
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit(Lsn(0x60))?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1166,15 +1166,15 @@ mod tests {
);
// Truncate to zero length
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x68));
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
m.commit(Lsn(0x68))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x70));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
m.commit(Lsn(0x70))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
assert_eq!(
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
@@ -1186,9 +1186,9 @@ mod tests {
);
// Extend a lot more, leaving a big gap that spans across segments
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x80));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
m.commit(Lsn(0x80))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
for blk in 2..1500 {
assert_eq!(
@@ -1212,18 +1212,18 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit(Lsn(0x20))?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
// Drop rel
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A)?;
m.commit(Lsn(0x30))?;
m.commit()?;
// Check that rel is not visible anymore
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
@@ -1232,9 +1232,9 @@ mod tests {
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
// Re-create it
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x40));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
m.commit(Lsn(0x40))?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
@@ -1254,12 +1254,12 @@ mod tests {
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit(Lsn(0x20))?;
m.commit()?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1280,9 +1280,9 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
m.commit(Lsn(0x60))?;
m.commit()?;
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
@@ -1310,12 +1310,12 @@ mod tests {
// Extend relation again.
// Add enough blocks to create second segment
let lsn = Lsn(0x80);
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(lsn);
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit(lsn)?;
m.commit()?;
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
@@ -1343,10 +1343,10 @@ mod tests {
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
m.commit(Lsn(lsn))?;
m.commit()?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1358,9 +1358,9 @@ mod tests {
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
@@ -1369,9 +1369,9 @@ mod tests {
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
@@ -1383,9 +1383,9 @@ mod tests {
let mut size: i32 = 3000;
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
size as BlockNumber

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
//! Actual Postgres connection handler to stream WAL to the server.
//! Runs as a separate, cancellable Tokio task.
use std::{
str::FromStr,
sync::Arc,
@@ -10,30 +10,113 @@ use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use super::TaskEvent;
use crate::{
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
pub async fn handle_walreceiver_connection(
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
#[derive(Debug)]
pub struct WalReceiverConnection {
handle: tokio::task::JoinHandle<()>,
cancellation: watch::Sender<()>,
events_receiver: watch::Receiver<WalConnectionEvent>,
}
impl WalReceiverConnection {
/// Initializes the connection task, returning a set of handles on top of it.
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
pub fn open(
id: ZTenantTimelineId,
safekeeper_id: NodeId,
wal_producer_connstr: String,
connect_timeout: Duration,
) -> Self {
let (cancellation, mut cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
let handle = tokio::spawn(
async move {
let connection_result = handle_walreceiver_connection(
id,
&wal_producer_connstr,
&events_sender,
&mut cancellation_receiver,
connect_timeout,
)
.await
.map_err(|e| {
format!("Walreceiver connection for id {id} failed with error: {e:#}")
});
match &connection_result {
Ok(()) => {
debug!("Walreceiver connection for id {id} ended successfully")
}
Err(e) => warn!("{e}"),
}
events_sender
.send(WalConnectionEvent::End(connection_result))
.ok();
}
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
);
Self {
handle,
cancellation,
events_receiver,
}
}
/// Polls for the next WAL receiver event, if there's any available since the last check.
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
/// Only the last event is returned, all events received between observatins are lost.
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
match self.events_receiver.changed().await {
Ok(()) => Some(self.events_receiver.borrow().clone()),
Err(_cancellation_error) => None,
}
}
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
self.cancellation.send(()).ok();
let handle = &mut self.handle;
handle
.await
.context("Failed to join on a walreceiver connection task")?;
Ok(())
}
}
async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
mut cancellation: watch::Receiver<()>,
events_sender: &watch::Sender<WalConnectionEvent>,
cancellation: &mut watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
@@ -131,6 +214,8 @@ pub async fn handle_walreceiver_connection(
while let Some(replication_message) = {
select! {
// check for shutdown first
biased;
_ = cancellation.changed() => {
info!("walreceiver interrupted");
None
@@ -151,25 +236,19 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.context("could not ingest record at {lsn}")?;
walingest.ingest_record(&timeline, recdata, lsn)?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
last_rec_lsn = lsn;
}
if !caught_up && endlsn >= end_of_wal {
@@ -265,7 +344,7 @@ pub async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) {
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}

File diff suppressed because it is too large Load Diff

View File

@@ -96,7 +96,6 @@ impl DecodedBkpBlock {
}
}
#[derive(Default)]
pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8,
@@ -506,10 +505,7 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -538,9 +534,7 @@ pub fn decode_wal_record(
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
if !decoded.blocks.is_empty() {
decoded.blocks.clear();
}
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
@@ -719,7 +713,7 @@ pub fn decode_wal_record(
blk.blkno
);
decoded.blocks.push(blk);
blocks.push(blk);
}
_ => {
@@ -730,7 +724,7 @@ pub fn decode_wal_record(
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in decoded.blocks.iter_mut() {
for blk in blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
@@ -750,13 +744,14 @@ pub fn decode_wal_record(
assert_eq!(buf.remaining(), main_data_len as usize);
}
decoded.xl_xid = xlogrec.xl_xid;
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.main_data_offset = main_data_offset;
Ok(())
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
})
}
///

View File

@@ -49,12 +49,6 @@ impl UserFacingError for ConsoleAuthError {
}
}
impl From<&auth::credentials::ClientCredsParseError> for ConsoleAuthError {
fn from(e: &auth::credentials::ClientCredsParseError) -> Self {
ConsoleAuthError::BadProjectName(e.clone())
}
}
// TODO: convert into an enum with "error"
#[derive(Serialize, Deserialize, Debug)]
struct GetRoleSecretResponse {
@@ -100,7 +94,7 @@ impl<'a> Api<'a> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_get_role_secret");
url.query_pairs_mut()
.append_pair("project", self.creds.project_name.as_ref()?)
.append_pair("project", &self.creds.project_name)
.append_pair("role", &self.creds.user);
// TODO: use a proper logger
@@ -123,8 +117,8 @@ impl<'a> Api<'a> {
async fn wake_compute(&self) -> Result<DatabaseInfo> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_wake_compute");
let project_name = self.creds.project_name.as_ref()?;
url.query_pairs_mut().append_pair("project", project_name);
url.query_pairs_mut()
.append_pair("project", &self.creds.project_name);
// TODO: use a proper logger
println!("cplane request: {url}");

View File

@@ -8,7 +8,7 @@ use std::collections::HashMap;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
#[derive(Debug, Error, PartialEq)]
pub enum ClientCredsParseError {
#[error("Parameter `{0}` is missing in startup packet.")]
MissingKey(&'static str),
@@ -44,7 +44,7 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials {
pub user: String,
pub dbname: String,
pub project_name: Result<String, ClientCredsParseError>,
pub project_name: String,
}
impl ClientCredentials {
@@ -67,7 +67,7 @@ impl ClientCredentials {
let user = get_param("user")?;
let dbname = get_param("database")?;
let project_name = get_param("project").ok();
let project_name = get_project_name(sni_data, common_name, project_name.as_deref());
let project_name = get_project_name(sni_data, common_name, project_name.as_deref())?;
Ok(Self {
user,

View File

@@ -5,11 +5,6 @@ use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_broker::subscription_value::SkTimelineInfo;
use etcd_broker::LeaseKeepAliveStream;
use etcd_broker::LeaseKeeper;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
@@ -26,7 +21,7 @@ use utils::zid::{NodeId, ZTenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
const LEASE_TTL_SEC: i64 = 10;
const LEASE_TTL_SEC: i64 = 5;
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
@@ -159,48 +154,13 @@ pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
zttid: ZTenantTimelineId,
mut client: Client,
key: String,
sk_info: SkTimelineInfo,
mut lease: Lease,
) -> anyhow::Result<(ZTenantTimelineId, Lease)> {
let put_opts = PutOptions::new().with_lease(lease.id);
client
.put(
key.clone(),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.with_context(|| format!("failed to push safekeeper info to {}", key))?;
// revive the lease
lease
.keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
lease
.ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
Ok((zttid, lease))
}
struct Lease {
id: i64,
keeper: LeaseKeeper,
ka_stream: LeaseKeepAliveStream,
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
let mut leases: HashMap<ZTenantTimelineId, Lease> = HashMap::new();
// Get and maintain lease to automatically delete obsolete data
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
loop {
@@ -208,46 +168,33 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let active_tlis = GlobalTimelines::get_active_timelines();
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
for zttid in active_tlis.iter() {
if let Entry::Vacant(v) = leases.entry(*zttid) {
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
v.insert(Lease {
id: lease.id(),
keeper,
ka_stream,
});
for zttid in GlobalTimelines::get_active_timelines() {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(
timeline_safekeeper_path(
conf.broker_etcd_prefix.clone(),
zttid,
conf.my_id,
),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.context("failed to push safekeeper info")?;
}
}
leases.retain(|zttid, _| active_tlis.contains(zttid));
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
let handles = active_tlis
.iter()
.filter_map(|zttid| GlobalTimelines::get_loaded(*zttid))
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let key = timeline_safekeeper_path(
conf.broker_etcd_prefix.clone(),
tli.zttid,
conf.my_id,
);
let lease = leases.remove(&tli.zttid).unwrap();
tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease))
})
.collect::<Vec<_>>();
for h in handles {
let (zttid, lease) = h.await??;
// It is ugly to pull leases from hash and then put it back, but
// otherwise we have to resort to long living per tli tasks (which
// would generate a lot of errors when etcd is down) as task wants to
// have 'static objects, we can't borrow to it.
leases.insert(zttid, lease);
}
// revive the lease
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
@@ -274,12 +221,15 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.value_updates.recv().await {
match subscription.fetch_data().await {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
for (zttid, sk_info) in new_info {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id).await?
}
}
}
}
None => {

View File

@@ -239,19 +239,6 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
if oldstate.timeline_start_lsn != Lsn(0) {
return Ok(oldstate);
}
// set special timeline_start_lsn because we don't know the real one
info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)");
oldstate.timeline_start_lsn = Lsn(1);
oldstate.local_start_lsn = Lsn(1);
return Ok(oldstate);
}
bail!("unsupported safekeeper control file version {}", version)
}

View File

@@ -28,7 +28,7 @@ use utils::{
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 6;
pub const SK_FORMAT_VERSION: u32 = 5;
const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0;

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use tokio::sync::watch;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fs::{self};
use std::sync::{Arc, Mutex, MutexGuard};
@@ -445,9 +445,9 @@ impl Timeline {
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
SkTimelineInfo {
Ok(SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
@@ -460,7 +460,7 @@ impl Timeline {
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}
})
}
/// Update timeline state with peer safekeeper data.
@@ -625,8 +625,6 @@ impl GlobalTimelines {
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&zttid) {
@@ -669,7 +667,7 @@ impl GlobalTimelines {
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> HashSet<ZTenantTimelineId> {
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines

View File

@@ -45,7 +45,7 @@ If you want to run all tests that have the string "bench" in their names:
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.
`ZENITH_BIN`: The directory where zenith binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.

View File

@@ -35,14 +35,9 @@ def test_createdb(neon_simple_env: NeonEnv):
with closing(db.connect(dbname='foodb')) as conn:
with conn.cursor() as cur:
# Check database size in both branches
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
cur.execute(
'select pg_size_pretty(pg_database_size(%s)), pg_size_pretty(sum(pg_relation_size(oid))) from pg_class where relisshared is false;',
('foodb', ))
res = cur.fetchone()
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now

View File

@@ -1,5 +1,5 @@
# It's possible to run any regular test with the local fs remote storage via
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" poetry ......
import shutil, os
from contextlib import closing

View File

@@ -8,6 +8,7 @@ import time
def test_timeline_size(neon_simple_env: NeonEnv):
env = neon_simple_env
# Branch at the point where only 100 rows were inserted
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
@@ -22,6 +23,7 @@ def test_timeline_size(neon_simple_env: NeonEnv):
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
# Create table, and insert the first 100 rows
cur.execute("CREATE TABLE foo (t text)")
cur.execute("""
INSERT INTO foo
@@ -41,51 +43,6 @@ def test_timeline_size(neon_simple_env: NeonEnv):
"current_logical_size_non_incremental"]
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
timeline_details = assert_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('CREATE DATABASE foodb')
with closing(pgmain.connect(dbname='foodb')) as conn:
with conn.cursor() as cur2:
cur2.execute("CREATE TABLE foo (t text)")
cur2.execute("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10) g
""")
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('DROP DATABASE foodb')
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
started_at = time.time()

View File

@@ -50,7 +50,7 @@ A fixture is created with the decorator @pytest.fixture decorator.
See docs: https://docs.pytest.org/en/6.2.x/fixture.html
There are several environment variables that can control the running of tests:
NEON_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
There's no need to import this file to use it. It should be declared as a plugin
inside conftest.py, and that makes it available to all tests.
@@ -151,7 +151,7 @@ def pytest_configure(config):
return
# Find the neon binaries.
global neon_binpath
env_neon_bin = os.environ.get('NEON_BIN')
env_neon_bin = os.environ.get('ZENITH_BIN')
if env_neon_bin:
neon_binpath = env_neon_bin
else:

View File

@@ -1,6 +1,4 @@
import os
import threading
import time
from typing import List
import pytest
@@ -101,34 +99,3 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
env.flush()
@pytest.mark.parametrize("n_tables", [1, 10])
@pytest.mark.parametrize("duration", get_durations_matrix(10))
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
n_tables: int,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
with env.pg.connect().cursor() as cur:
for i in range(n_tables):
cur.execute(
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
def start_single_table_workload(table_id: int):
start = time.time()
with env.pg.connect().cursor() as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
with env.record_pg_stats(pg_stats_wo):
threads = [
threading.Thread(target=start_single_table_workload, args=(i, ))
for i in range(n_tables)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

View File

@@ -80,7 +80,6 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
thread.join()
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("n_tables", [5])
@pytest.mark.parametrize("scale", get_scales_matrix(5))
@pytest.mark.parametrize("num_iters", [10])
@@ -122,7 +121,6 @@ def start_pgbench_simple_update_workload(env: PgCompare, duration: int):
env.flush()
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("scale", get_scales_matrix(100))
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, duration: int):
@@ -160,7 +158,6 @@ def start_pgbench_intensive_initialization(env: PgCompare, scale: int):
])
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("scale", get_scales_matrix(1000))
def test_pgbench_intensive_init_workload(pg_compare: PgCompare, scale: int):
env = pg_compare