diff --git a/.circleci/ansible/staging.hosts b/.circleci/ansible/staging.hosts index 4273b885e1..29e4efbb19 100644 --- a/.circleci/ansible/staging.hosts +++ b/.circleci/ansible/staging.hosts @@ -1,6 +1,7 @@ [pageservers] #zenith-us-stage-ps-1 console_region_id=27 zenith-us-stage-ps-2 console_region_id=27 +zenith-us-stage-ps-3 console_region_id=27 [safekeepers] zenith-us-stage-sk-4 console_region_id=27 diff --git a/.circleci/config.yml b/.circleci/config.yml index 9aca415dc8..f64ba94cb4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,10 +100,8 @@ jobs: name: Rust build << parameters.build_type >> command: | if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then - cov_prefix=() CARGO_FLAGS="--release --features profiling" fi @@ -112,7 +110,7 @@ jobs: export RUSTC_WRAPPER=cachepot export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" - "${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests + mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests cachepot -s - save_cache: @@ -128,32 +126,24 @@ jobs: name: cargo test command: | if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then - cov_prefix=() CARGO_FLAGS=--release fi - "${cov_prefix[@]}" cargo test $CARGO_FLAGS + cargo test $CARGO_FLAGS # Install the rust binaries, for use by test jobs - run: name: Install rust binaries command: | - if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) - elif [[ $BUILD_TYPE == "release" ]]; then - cov_prefix=() - fi - binaries=$( - "${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps | + cargo metadata --format-version=1 --no-deps | jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name' ) test_exe_paths=$( - "${cov_prefix[@]}" cargo test --message-format=json --no-run | + cargo test --message-format=json --no-run | jq -r '.executable | select(. != null)' ) @@ -166,34 +156,15 @@ jobs: SRC=target/$BUILD_TYPE/$bin DST=/tmp/zenith/bin/$bin cp $SRC $DST - echo $DST >> /tmp/zenith/etc/binaries.list done - # Install test executables (for code coverage) - if [[ $BUILD_TYPE == "debug" ]]; then - for bin in $test_exe_paths; do - SRC=$bin - DST=/tmp/zenith/test_bin/$(basename $bin) - cp $SRC $DST - echo $DST >> /tmp/zenith/etc/binaries.list - done - fi - # Install the postgres binaries, for use by test jobs - run: name: Install postgres binaries command: | cp -a tmp_install /tmp/zenith/pg_install - - run: - name: Merge coverage data - command: | - # This will speed up workspace uploads - if [[ $BUILD_TYPE == "debug" ]]; then - scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge - fi - - # Save the rust binaries and coverage data for other jobs in this workflow. + # Save rust binaries for other jobs in the workflow - persist_to_workspace: root: /tmp/zenith paths: @@ -286,7 +257,7 @@ jobs: # no_output_timeout, specified here. no_output_timeout: 10m environment: - - ZENITH_BIN: /tmp/zenith/bin + - NEON_BIN: /tmp/zenith/bin - POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install - TEST_OUTPUT: /tmp/test_output # this variable will be embedded in perf test report @@ -314,12 +285,6 @@ jobs: export GITHUB_SHA=$CIRCLE_SHA1 - if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) - elif [[ $BUILD_TYPE == "release" ]]; then - cov_prefix=() - fi - # Run the tests. # # The junit.xml file allows CircleCI to display more fine-grained test information @@ -330,7 +295,7 @@ jobs: # -n4 uses four processes to run tests via pytest-xdist # -s is not used to prevent pytest from capturing output, because tests are running # in parallel and logs are mixed between different tests - "${cov_prefix[@]}" ./scripts/pytest \ + ./scripts/pytest \ --junitxml=$TEST_OUTPUT/junit.xml \ --tb=short \ --verbose \ @@ -359,67 +324,12 @@ jobs: # The store_test_results step tells CircleCI where to find the junit.xml file. - store_test_results: path: /tmp/test_output - - run: - name: Merge coverage data - command: | - # This will speed up workspace uploads - if [[ $BUILD_TYPE == "debug" ]]; then - scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge - fi - # Save coverage data (if any) + # Save data (if any) - persist_to_workspace: root: /tmp/zenith paths: - "*" - coverage-report: - executor: neon-xlarge-executor - steps: - - attach_workspace: - at: /tmp/zenith - - checkout - - restore_cache: - name: Restore rust cache - keys: - # Require an exact match. While an out of date cache might speed up the build, - # there's no way to clean out old packages, so the cache grows every time something - # changes. - - v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }} - - run: - name: Build coverage report - command: | - COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1 - - scripts/coverage \ - --dir=/tmp/zenith/coverage report \ - --input-objects=/tmp/zenith/etc/binaries.list \ - --commit-url=$COMMIT_URL \ - --format=github - - run: - name: Upload coverage report - command: | - LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME - REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1 - COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1 - - scripts/git-upload \ - --repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \ - --message="Add code coverage for $COMMIT_URL" \ - copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE - - # Add link to the coverage report to the commit - curl -f -X POST \ - https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \ - -H "Accept: application/vnd.github.v3+json" \ - --user "$CI_ACCESS_TOKEN" \ - --data \ - "{ - \"state\": \"success\", - \"context\": \"zenith-coverage\", - \"description\": \"Coverage report is ready\", - \"target_url\": \"$REPORT_URL\" - }" - # Build neondatabase/neon:latest image and push it to Docker hub docker-image: docker: @@ -688,50 +598,6 @@ jobs: helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait - # Trigger a new remote CI job - remote-ci-trigger: - docker: - - image: cimg/base:2021.04 - parameters: - remote_repo: - type: string - environment: - REMOTE_REPO: << parameters.remote_repo >> - steps: - - run: - name: Set PR's status to pending - command: | - LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME - - curl -f -X POST \ - https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \ - -H "Accept: application/vnd.github.v3+json" \ - --user "$CI_ACCESS_TOKEN" \ - --data \ - "{ - \"state\": \"pending\", - \"context\": \"neon-cloud-e2e\", - \"description\": \"[$REMOTE_REPO] Remote CI job is about to start\" - }" - - run: - name: Request a remote CI test - command: | - LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME - - curl -f -X POST \ - https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \ - -H "Accept: application/vnd.github.v3+json" \ - --user "$CI_ACCESS_TOKEN" \ - --data \ - "{ - \"ref\": \"main\", - \"inputs\": { - \"ci_job_name\": \"neon-cloud-e2e\", - \"commit_hash\": \"$CIRCLE_SHA1\", - \"remote_repo\": \"$LOCAL_REPO\" - } - }" - workflows: build_and_test: jobs: @@ -774,12 +640,6 @@ workflows: save_perf_report: true requires: - build-neon-release - - coverage-report: - # Context passes credentials for gh api - context: CI_ACCESS_TOKEN - requires: - # TODO: consider adding more - - other-tests-debug - docker-image: # Context gives an ability to login context: Docker Hub @@ -880,14 +740,3 @@ workflows: - release requires: - docker-image-release - - remote-ci-trigger: - # Context passes credentials for gh api - context: CI_ACCESS_TOKEN - remote_repo: "neondatabase/cloud" - requires: - # XXX: Successful build doesn't mean everything is OK, but - # the job to be triggered takes so much time to complete (~22 min) - # that it's better not to wait for the commented-out steps - - build-neon-release - # - pg_regress-tests-release - # - other-tests-release diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index 94fac2ee99..48c0c2b925 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -2,25 +2,29 @@ name: 'Run python test' description: 'Runs a Neon python test set, performing all the required preparations before' inputs: - # Select the type of Rust build. Must be "release" or "debug". build_type: + description: 'Type of Rust (neon) and C (postgres) builds. Must be "release" or "debug".' required: true rust_toolchain: + description: 'Rust toolchain version to fetch the caches' required: true - # This parameter is required, to prevent the mistake of running all tests in one job. test_selection: + description: 'A python test suite to run' required: true - # Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr extra_params: + description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr' required: false default: '' needs_postgres_source: + description: 'Set to true if the test suite requires postgres source checked out' required: false default: 'false' run_in_parallel: + description: 'Whether to run tests in parallel' required: false default: 'true' save_perf_report: + description: 'Whether to upload the performance report' required: false default: 'false' @@ -60,7 +64,7 @@ runs: - name: Run pytest env: - ZENITH_BIN: /tmp/neon/bin + NEON_BIN: /tmp/neon/bin POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install TEST_OUTPUT: /tmp/test_output # this variable will be embedded in perf test report @@ -88,7 +92,7 @@ runs: fi if [[ "${{ inputs.build_type }}" == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run) + cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run) elif [[ "${{ inputs.build_type }}" == "release" ]]; then cov_prefix=() fi @@ -117,3 +121,20 @@ runs: scripts/generate_and_push_perf_report.sh fi fi + + - name: Delete all data but logs + shell: bash -ex {0} + if: always() + run: | + du -sh /tmp/test_output/* + find /tmp/test_output -type f ! -name "*.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" ! -name "*.metrics" -delete + du -sh /tmp/test_output/* + + - name: Upload python test logs + if: always() + uses: actions/upload-artifact@v3 + with: + retention-days: 7 + if-no-files-found: error + name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs + path: /tmp/test_output/ diff --git a/.github/actions/save-coverage-data/action.yml b/.github/actions/save-coverage-data/action.yml new file mode 100644 index 0000000000..7ad04cf1fe --- /dev/null +++ b/.github/actions/save-coverage-data/action.yml @@ -0,0 +1,17 @@ +name: 'Merge and upload coverage data' +description: 'Compresses and uploads the coverage data as an artifact' + +runs: + using: "composite" + steps: + - name: Merge coverage data + shell: bash -ex {0} + run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge + + - name: Upload coverage data + uses: actions/upload-artifact@v3 + with: + retention-days: 7 + if-no-files-found: error + name: coverage-data-artifact + path: /tmp/coverage/ diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5f4dd754d2..81b4585714 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -1,13 +1,28 @@ -name: build_and_test -on: [ push ] +name: Test + +on: + push: + branches: + - main + pull_request: + defaults: run: shell: bash -ex {0} +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + RUST_BACKTRACE: 1 + COPT: '-Werror' + jobs: build-postgres: runs-on: [ self-hosted, Linux, k8s-runner ] strategy: + fail-fast: false matrix: build_type: [ debug, release ] rust_toolchain: [ 1.58 ] @@ -34,7 +49,7 @@ jobs: - name: Build postgres if: steps.cache_pg.outputs.cache-hit != 'true' - run: COPT='-Werror' mold -run make postgres -j$(nproc) + run: mold -run make postgres -j$(nproc) # actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache - name: Prepare postgres artifact @@ -52,6 +67,7 @@ jobs: runs-on: [ self-hosted, Linux, k8s-runner ] needs: [ build-postgres ] strategy: + fail-fast: false matrix: build_type: [ debug, release ] rust_toolchain: [ 1.58 ] @@ -85,44 +101,39 @@ jobs: ~/.cargo/registry/ ~/.cargo/git/ target/ - key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + # Fall back to older versions of the key, if no cache for current Cargo.lock was found + key: | + v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}- - name: Run cargo build run: | if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run) + cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run) CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then cov_prefix=() CARGO_FLAGS="--release --features profiling" fi - export CACHEPOT_BUCKET=zenith-rust-cachepot - export RUSTC_WRAPPER=cachepot - export AWS_ACCESS_KEY_ID="${{ secrets.AWS_ACCESS_KEY_ID }}" - export AWS_SECRET_ACCESS_KEY="${{ secrets.AWS_SECRET_ACCESS_KEY }}" - export HOME=/home/runner "${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests - cachepot -s - name: Run cargo test run: | - export HOME=/home/runner if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run) + cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run) CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then cov_prefix=() CARGO_FLAGS=--release fi - + "${cov_prefix[@]}" cargo test $CARGO_FLAGS - name: Install rust binaries run: | - export HOME=/home/runner if [[ $BUILD_TYPE == "debug" ]]; then - cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run) + cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run) elif [[ $BUILD_TYPE == "release" ]]; then cov_prefix=() fi @@ -137,39 +148,36 @@ jobs: jq -r '.executable | select(. != null)' ) - mkdir -p /tmp/neon/bin - mkdir -p /tmp/neon/test_bin - mkdir -p /tmp/neon/etc + mkdir -p /tmp/neon/bin/ + mkdir -p /tmp/neon/test_bin/ + mkdir -p /tmp/neon/etc/ + + # Keep bloated coverage data files away from the rest of the artifact + mkdir -p /tmp/coverage/ # Install target binaries for bin in $binaries; do SRC=target/$BUILD_TYPE/$bin DST=/tmp/neon/bin/$bin - cp $SRC $DST - echo $DST >> /tmp/neon/etc/binaries.list + cp "$SRC" "$DST" done - # Install test executables (for code coverage) + # Install test executables and write list of all binaries (for code coverage) if [[ $BUILD_TYPE == "debug" ]]; then + for bin in $binaries; do + echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list + done for bin in $test_exe_paths; do SRC=$bin DST=/tmp/neon/test_bin/$(basename $bin) - cp $SRC $DST - echo $DST >> /tmp/neon/etc/binaries.list + cp "$SRC" "$DST" + echo "$DST" >> /tmp/coverage/binaries.list done fi - name: Install postgres binaries run: cp -a tmp_install /tmp/neon/pg_install - - name: Merge coverage data - run: | - export HOME=/home/runner - # This will speed up workspace uploads - if [[ $BUILD_TYPE == "debug" ]]; then - scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage merge - fi - - name: Prepare neon artifact run: tar -C /tmp/neon/ -czf ./neon.tgz . @@ -181,38 +189,17 @@ jobs: name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact path: ./neon.tgz - check-codestyle-python: - runs-on: [ self-hosted, Linux, k8s-runner ] - strategy: - matrix: - rust_toolchain: [ 1.58 ] - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - submodules: true - fetch-depth: 1 + # XXX: keep this after the binaries.list is formed, so the coverage can properly work later + - name: Merge and upload coverage data + if: matrix.build_type == 'debug' + uses: ./.github/actions/save-coverage-data - - name: Cache poetry deps - id: cache_poetry - uses: actions/cache@v3 - with: - path: ~/.cache/pypoetry/virtualenvs - key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }} - - - name: Install Python deps - run: ./scripts/pysync - - - name: Run yapf to ensure code format - run: poetry run yapf --recursive --diff . - - - name: Run mypy to check types - run: poetry run mypy . pg_regress-tests: runs-on: [ self-hosted, Linux, k8s-runner ] needs: [ build-neon ] strategy: + fail-fast: false matrix: build_type: [ debug, release ] rust_toolchain: [ 1.58 ] @@ -231,10 +218,15 @@ jobs: test_selection: batch_pg_regress needs_postgres_source: true + - name: Merge and upload coverage data + if: matrix.build_type == 'debug' + uses: ./.github/actions/save-coverage-data + other-tests: runs-on: [ self-hosted, Linux, k8s-runner ] needs: [ build-neon ] strategy: + fail-fast: false matrix: build_type: [ debug, release ] rust_toolchain: [ 1.58 ] @@ -252,10 +244,15 @@ jobs: rust_toolchain: ${{ matrix.rust_toolchain }} test_selection: batch_others + - name: Merge and upload coverage data + if: matrix.build_type == 'debug' + uses: ./.github/actions/save-coverage-data + benchmarks: runs-on: [ self-hosted, Linux, k8s-runner ] needs: [ build-neon ] strategy: + fail-fast: false matrix: build_type: [ release ] rust_toolchain: [ 1.58 ] @@ -273,4 +270,120 @@ jobs: rust_toolchain: ${{ matrix.rust_toolchain }} test_selection: performance run_in_parallel: false - # save_perf_report: true + save_perf_report: true + # XXX: no coverage data handling here, since benchmarks are run on release builds, + # while coverage is currently collected for the debug ones + + coverage-report: + runs-on: [ self-hosted, Linux, k8s-runner ] + needs: [ other-tests, pg_regress-tests ] + strategy: + fail-fast: false + matrix: + build_type: [ debug ] + rust_toolchain: [ 1.58 ] + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 1 + + - name: Restore cargo deps cache + id: cache_cargo + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry/ + ~/.cargo/git/ + target/ + key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + + - name: Get Neon artifact for restoration + uses: actions/download-artifact@v3 + with: + name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact + path: ./neon-artifact/ + + - name: Extract Neon artifact + run: | + mkdir -p /tmp/neon/ + tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/ + rm -rf ./neon-artifact/ + + - name: Restore coverage data + uses: actions/download-artifact@v3 + with: + name: coverage-data-artifact + path: /tmp/coverage/ + + - name: Merge coverage data + run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge + + - name: Build and upload coverage report + run: | + COMMIT_SHA=${{ github.event.pull_request.head.sha }} + COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}} + COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA + + scripts/coverage \ + --dir=/tmp/coverage report \ + --input-objects=/tmp/coverage/binaries.list \ + --commit-url=$COMMIT_URL \ + --format=github + + REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA + + scripts/git-upload \ + --repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \ + --message="Add code coverage for $COMMIT_URL" \ + copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE + + # Add link to the coverage report to the commit + curl -f -X POST \ + https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \ + -H "Accept: application/vnd.github.v3+json" \ + --user "${{ secrets.CI_ACCESS_TOKEN }}" \ + --data \ + "{ + \"state\": \"success\", + \"context\": \"neon-coverage\", + \"description\": \"Coverage report is ready\", + \"target_url\": \"$REPORT_URL\" + }" + + trigger-e2e-tests: + runs-on: [ self-hosted, Linux, k8s-runner ] + needs: [ build-neon ] + steps: + - name: Set PR's status to pending and request a remote CI test + run: | + COMMIT_SHA=${{ github.event.pull_request.head.sha }} + COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}} + + REMOTE_REPO="${{ github.repository_owner }}/cloud" + + curl -f -X POST \ + https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \ + -H "Accept: application/vnd.github.v3+json" \ + --user "${{ secrets.CI_ACCESS_TOKEN }}" \ + --data \ + "{ + \"state\": \"pending\", + \"context\": \"neon-cloud-e2e\", + \"description\": \"[$REMOTE_REPO] Remote CI job is about to start\" + }" + + curl -f -X POST \ + https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \ + -H "Accept: application/vnd.github.v3+json" \ + --user "${{ secrets.CI_ACCESS_TOKEN }}" \ + --data \ + "{ + \"ref\": \"main\", + \"inputs\": { + \"ci_job_name\": \"neon-cloud-e2e\", + \"commit_hash\": \"$COMMIT_SHA\", + \"remote_repo\": \"${{ github.repository }}\" + } + }" diff --git a/.github/workflows/testing.yml b/.github/workflows/codestyle.yml similarity index 73% rename from .github/workflows/testing.yml rename to .github/workflows/codestyle.yml index aa1e152fb2..2b8a01e94e 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/codestyle.yml @@ -1,4 +1,4 @@ -name: Build and Test +name: Check code style and build on: push: @@ -6,15 +6,27 @@ on: - main pull_request: +defaults: + run: + shell: bash -ex {0} + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + RUST_BACKTRACE: 1 + jobs: - regression-check: + check-codestyle-rust: strategy: + fail-fast: false matrix: # If we want to duplicate this job for different # Rust toolchains (e.g. nightly or 1.37.0), add them here. rust_toolchain: [1.58] os: [ubuntu-latest, macos-latest] - timeout-minutes: 30 + timeout-minutes: 50 name: run regression test suite runs-on: ${{ matrix.os }} @@ -92,5 +104,30 @@ jobs: - name: Run cargo clippy run: ./run_clippy.sh - - name: Run cargo test - run: cargo test --all --all-targets + - name: Ensure all project builds + run: cargo build --all --all-targets + + check-codestyle-python: + runs-on: [ self-hosted, Linux, k8s-runner ] + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: false + fetch-depth: 1 + + - name: Cache poetry deps + id: cache_poetry + uses: actions/cache@v3 + with: + path: ~/.cache/pypoetry/virtualenvs + key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }} + + - name: Install Python deps + run: ./scripts/pysync + + - name: Run yapf to ensure code format + run: poetry run yapf --recursive --diff . + + - name: Run mypy to check types + run: poetry run mypy . diff --git a/Cargo.lock b/Cargo.lock index f4d3743676..ef1b7327c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,7 @@ dependencies = [ "tar", "tokio", "tokio-postgres", + "urlencoding", "workspace_hack", ] @@ -3684,6 +3685,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821" + [[package]] name = "utils" version = "0.1.0" diff --git a/Dockerfile b/Dockerfile index 62e0de7e15..ad85638af3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build Postgres -FROM zimg/rust:1.58 AS pg-build +FROM neondatabase/rust:1.58 AS pg-build WORKDIR /pg USER root @@ -14,7 +14,7 @@ RUN set -e \ && tar -C tmp_install -czf /postgres_install.tar.gz . # Build zenith binaries -FROM zimg/rust:1.58 AS build +FROM neondatabase/rust:1.58 AS build ARG GIT_VERSION=local ARG CACHEPOT_BUCKET=zenith-rust-cachepot @@ -46,9 +46,9 @@ RUN set -e \ && useradd -d /data zenith \ && chown -R zenith:zenith /data -COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/pageserver /usr/local/bin -COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/safekeeper /usr/local/bin -COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/proxy /usr/local/bin +COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin +COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin +COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin COPY --from=pg-build /pg/tmp_install/ /usr/local/ COPY --from=pg-build /postgres_install.tar.gz /data/ diff --git a/Dockerfile.compute-tools b/Dockerfile.compute-tools index f0c9b9d56a..71770ae9ed 100644 --- a/Dockerfile.compute-tools +++ b/Dockerfile.compute-tools @@ -1,6 +1,6 @@ # First transient image to build compute_tools binaries # NB: keep in sync with rust image version in .circle/config.yml -FROM zimg/rust:1.58 AS rust-build +FROM neondatabase/rust:1.58 AS rust-build ARG CACHEPOT_BUCKET=zenith-rust-cachepot ARG AWS_ACCESS_KEY_ID @@ -15,4 +15,4 @@ RUN set -e \ # Final image that only has one binary FROM debian:buster-slim -COPY --from=rust-build /home/circleci/project/target/release/compute_ctl /usr/local/bin/compute_ctl +COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl diff --git a/README.md b/README.md index f63c21459e..6a4fc5ce1b 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh 1. Install XCode and dependencies ``` xcode-select --install -brew install protobuf etcd +brew install protobuf etcd openssl ``` 2. [Install Rust](https://www.rust-lang.org/tools/install) diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 42db763961..a47f9998e6 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -18,4 +18,5 @@ serde_json = "1" tar = "0.4" tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] } tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +urlencoding = "2.1.0" workspace_hack = { version = "0.1", path = "../workspace_hack" } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a2e6874a28..abf7081cb7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -289,6 +289,7 @@ impl ComputeNode { handle_roles(&self.spec, &mut client)?; handle_databases(&self.spec, &mut client)?; + handle_role_deletions(self, &mut client)?; handle_grants(&self.spec, &mut client)?; create_writablity_check_data(&mut client)?; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index e88df56a65..d2cfb6d726 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -2,9 +2,11 @@ use std::path::Path; use anyhow::Result; use log::{info, log_enabled, warn, Level}; -use postgres::Client; +use postgres::{Client, NoTls}; use serde::Deserialize; +use urlencoding::encode; +use crate::compute::ComputeNode; use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -97,18 +99,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { // Process delta operations first if let Some(ops) = &spec.delta_operations { - info!("processing delta operations on roles"); + info!("processing role renames"); for op in ops { match op.action.as_ref() { - // We do not check either role exists or not, - // Postgres will take care of it for us "delete_role" => { - let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote()); - - warn!("deleting role '{}'", &op.name); - xact.execute(query.as_str(), &[])?; + // no-op now, roles will be deleted at the end of configuration } - // Renaming role drops its password, since tole name is + // Renaming role drops its password, since role name is // used as a salt there. It is important that this role // is recorded with a new `name` in the `roles` list. // Follow up roles update will set the new password. @@ -182,7 +179,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { xact.execute(query.as_str(), &[])?; let grant_query = format!( - "grant pg_read_all_data, pg_write_all_data to {}", + "GRANT pg_read_all_data, pg_write_all_data TO {}", name.quote() ); xact.execute(grant_query.as_str(), &[])?; @@ -197,6 +194,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { Ok(()) } +/// Reassign all dependent objects and delete requested roles. +pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { + let spec = &node.spec; + + // First, reassign all dependent objects to db owners. + if let Some(ops) = &spec.delta_operations { + info!("reassigning dependent objects of to-be-deleted roles"); + for op in ops { + if op.action == "delete_role" { + reassign_owned_objects(node, &op.name)?; + } + } + } + + // Second, proceed with role deletions. + let mut xact = client.transaction()?; + if let Some(ops) = &spec.delta_operations { + info!("processing role deletions"); + for op in ops { + // We do not check either role exists or not, + // Postgres will take care of it for us + if op.action == "delete_role" { + let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote()); + + warn!("deleting role '{}'", &op.name); + xact.execute(query.as_str(), &[])?; + } + } + } + + Ok(()) +} + +// Reassign all owned objects in all databases to the owner of the database. +fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> { + for db in &node.spec.cluster.databases { + if db.owner != *role_name { + let db_name_encoded = format!("/{}", encode(&db.name)); + let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1); + let mut client = Client::connect(&db_connstr, NoTls)?; + + // This will reassign all dependent objects to the db owner + let reassign_query = format!( + "REASSIGN OWNED BY {} TO {}", + role_name.quote(), + db.owner.quote() + ); + info!( + "reassigning objects owned by '{}' in db '{}' to '{}'", + role_name, &db.name, &db.owner + ); + client.simple_query(&reassign_query)?; + + // This now will only drop privileges of the role + let drop_query = format!("DROP OWNED BY {}", role_name.quote()); + client.simple_query(&drop_query)?; + } + } + + Ok(()) +} + /// It follows mostly the same logic as `handle_roles()` excepting that we /// does not use an explicit transactions block, since major database operations /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level @@ -294,13 +353,26 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { pub fn handle_grants(spec: &ComputeSpec, client: &mut Client) -> Result<()> { info!("cluster spec grants:"); + // We now have a separate `web_access` role to connect to the database + // via the web interface and proxy link auth. And also we grant a + // read / write all data privilege to every role. So also grant + // create to everyone. + // XXX: later we should stop messing with Postgres ACL in such horrible + // ways. + let roles = spec + .cluster + .roles + .iter() + .map(|r| r.name.quote()) + .collect::>(); + for db in &spec.cluster.databases { let dbname = &db.name; let query: String = format!( "GRANT CREATE ON DATABASE {} TO {}", dbname.quote(), - db.owner.quote() + roles.join(", ") ); info!("grant query {}", &query); diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 38d4a403c2..8f698977a9 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -6,17 +6,13 @@ pub mod subscription_key; /// All broker values, possible to use when dealing with etcd. pub mod subscription_value; -use std::{ - collections::{hash_map, HashMap}, - str::FromStr, -}; +use std::str::FromStr; use serde::de::DeserializeOwned; use subscription_key::SubscriptionKey; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::*; -use utils::zid::{NodeId, ZTenantTimelineId}; use crate::subscription_key::SubscriptionFullKey; @@ -28,18 +24,17 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon"; /// A way to control the data retrieval from a certain subscription. pub struct BrokerSubscription { - value_updates: mpsc::UnboundedReceiver>>, + /// An unbounded channel to fetch the relevant etcd updates from. + pub value_updates: mpsc::UnboundedReceiver>, key: SubscriptionKey, - watcher_handle: JoinHandle>, + /// A subscription task handle, to allow waiting on it for the task to complete. + /// Both the updates channel and the handle require `&mut`, so it's better to keep + /// both `pub` to allow using both in the same structures without borrow checker complaining. + pub watcher_handle: JoinHandle>, watcher: Watcher, } impl BrokerSubscription { - /// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet. - pub async fn fetch_data(&mut self) -> Option>> { - self.value_updates.recv().await - } - /// Cancels the subscription, stopping the data poller and waiting for it to shut down. pub async fn cancel(mut self) -> Result<(), BrokerError> { self.watcher.cancel().await.map_err(|e| { @@ -48,15 +43,41 @@ impl BrokerSubscription { format!("Failed to cancel broker subscription, kind: {:?}", self.key), ) })?; - self.watcher_handle.await.map_err(|e| { - BrokerError::InternalError(format!( - "Failed to join the broker value updates task, kind: {:?}, error: {e}", - self.key - )) - })? + match (&mut self.watcher_handle).await { + Ok(res) => res, + Err(e) => { + if e.is_cancelled() { + // don't error on the tasks that are cancelled already + Ok(()) + } else { + Err(BrokerError::InternalError(format!( + "Panicked during broker subscription task, kind: {:?}, error: {e}", + self.key + ))) + } + } + } } } +impl Drop for BrokerSubscription { + fn drop(&mut self) { + // we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped, + // no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task. + self.watcher_handle.abort(); + } +} + +/// An update from the etcd broker. +pub struct BrokerUpdate { + /// Etcd generation version, the bigger the more actual the data is. + pub etcd_version: i64, + /// Etcd key for the corresponding value, parsed from the broker KV. + pub key: SubscriptionFullKey, + /// Current etcd value, parsed from the broker KV. + pub value: V, +} + #[derive(Debug, thiserror::Error)] pub enum BrokerError { #[error("Etcd client error: {0}. Context: {1}")] @@ -124,41 +145,21 @@ where break; } - let mut value_updates: HashMap> = HashMap::new(); - // Keep track that the timeline data updates from etcd arrive in the right order. - // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas - // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. - let mut value_etcd_versions: HashMap = HashMap::new(); - - let events = resp.events(); debug!("Processing {} events", events.len()); for event in events { if EventType::Put == event.event_type() { if let Some(new_etcd_kv) = event.kv() { - let new_kv_version = new_etcd_kv.version(); - match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) { - Ok(Some((key, value))) => match value_updates - .entry(key.id) - .or_default() - .entry(key.node_id) - { - hash_map::Entry::Occupied(mut o) => { - let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN); - if old_etcd_kv_version < new_kv_version { - o.insert(value); - value_etcd_versions.insert(key.id,new_kv_version); - } else { - debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); - } - } - hash_map::Entry::Vacant(v) => { - v.insert(value); - value_etcd_versions.insert(key.id,new_kv_version); - } - }, + Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate { + etcd_version: new_etcd_kv.version(), + key, + value, + }) { + info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); + break; + }, Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"), Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"), Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"), @@ -166,13 +167,6 @@ where } } } - - if !value_updates.is_empty() { - if let Err(e) = value_updates_sender.send(value_updates) { - info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); - break; - } - } } Ok(()) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 079f477f75..77c320a181 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -733,17 +733,10 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; - let all_rels = timeline.list_rels(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; - let mut total_blocks: i64 = 0; + let total_blocks = + timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; - for rel in all_rels { - if rel.forknum == 0 { - let n_blocks = timeline.get_rel_size(rel, lsn).unwrap_or(0); - total_blocks += n_blocks as i64; - } - } - - let db_size = total_blocks * pg_constants::BLCKSZ as i64; + let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { db_size, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 59a53d68a1..ce305a55f4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -123,6 +123,19 @@ impl DatadirTimeline { self.tline.get(key, lsn) } + // Get size of a database in blocks + pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + let mut total_blocks = 0; + + let rels = self.list_rels(spcnode, dbnode, lsn)?; + + for rel in rels { + let n_blocks = self.get_rel_size(rel, lsn)?; + total_blocks += n_blocks as usize; + } + Ok(total_blocks) + } + /// Get size of a relation file pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); @@ -667,6 +680,10 @@ impl<'a, R: Repository> DatadirModification<'a, R> { } pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> { + let req_lsn = self.tline.get_last_record_lsn(); + + let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn)?; + // Remove entry from dbdir let buf = self.get(DBDIR_KEY)?; let mut dir = DbDirectory::des(&buf)?; @@ -680,7 +697,8 @@ impl<'a, R: Repository> DatadirModification<'a, R> { ); } - // FIXME: update pending_nblocks + // Update logical database size. + self.pending_nblocks -= total_blocks as isize; // Delete all relations and metadata files for the spcnode/dnode self.delete(dbdir_key_range(spcnode, dbnode)); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 82401e1d8c..fd9468a101 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -15,66 +15,38 @@ //! //! * handle the actual connection and WAL streaming //! -//! Handle happens dynamically, by portions of WAL being processed and registered in the server. +//! Handling happens dynamically, by portions of WAL being processed and registered in the server. //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection. //! -//! ## Implementation details -//! -//! WAL receiver's implementation consists of 3 kinds of nested loops, separately handling the logic from the bullets above: -//! -//! * [`init_wal_receiver_main_thread`], a wal receiver main thread, containing the control async loop: timeline addition/removal and interruption of a whole thread handling. -//! The loop is infallible, always trying to continue with the new tasks, the only place where it can fail is its initialization. -//! All of the code inside the loop is either async or a spawn_blocking wrapper around the sync code. -//! -//! * [`timeline_wal_broker_loop_step`], a broker task, handling the etcd broker subscription and polling, safekeeper selection logic and [re]connects. -//! On every concequent broker/wal streamer connection attempt, the loop steps are forced to wait for some time before running, -//! increasing with the number of attempts (capped with some fixed value). -//! This is done endlessly, to ensure we don't miss the WAL streaming when it gets available on one of the safekeepers. -//! -//! Apart from the broker management, it keeps the wal streaming connection open, with the safekeeper having the most advanced timeline state. -//! The connection could be closed from safekeeper side (with error or not), could be cancelled from pageserver side from time to time. -//! -//! * [`connection_handler::handle_walreceiver_connection`], a wal streaming task, opening the libpq connection and reading the data out of it to the end. -//! Does periodic reporting of the progress, to share some of the data via external HTTP API and to ensure we're able to switch connections when needed. -//! -//! Every task is cancellable via its separate cancellation channel, -//! also every such task's dependency (broker subscription or the data source channel) cancellation/drop triggers the corresponding task cancellation either. +//! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic. -mod connection_handler; +mod connection_manager; +mod walreceiver_connection; -use crate::config::PageServerConf; -use crate::http::models::WalReceiverEntry; -use crate::repository::Timeline; -use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; -use crate::thread_mgr::ThreadKind; -use crate::{thread_mgr, DatadirTimelineImpl}; use anyhow::{ensure, Context}; -use chrono::{NaiveDateTime, Utc}; -use etcd_broker::{ - subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, - Client, -}; +use etcd_broker::Client; use itertools::Itertools; use once_cell::sync::Lazy; use std::cell::Cell; use std::collections::{hash_map, HashMap, HashSet}; +use std::future::Future; use std::num::NonZeroU64; -use std::ops::ControlFlow; use std::sync::Arc; use std::thread_local; use std::time::Duration; -use tokio::select; use tokio::{ + select, sync::{mpsc, watch, RwLock}, task::JoinHandle, }; use tracing::*; use url::Url; -use utils::lsn::Lsn; -use utils::pq_proto::ReplicationFeedback; -use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}; -use self::connection_handler::{WalConnectionEvent, WalReceiverConnection}; +use crate::config::PageServerConf; +use crate::http::models::WalReceiverEntry; +use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; +use crate::thread_mgr::{self, ThreadKind}; +use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; thread_local! { // Boolean that is true only for WAL receiver threads @@ -125,7 +97,7 @@ pub fn init_wal_receiver_main_thread( .build() .context("Failed to create storage sync runtime")?; let etcd_client = runtime - .block_on(etcd_broker::Client::connect(etcd_endpoints, None)) + .block_on(Client::connect(etcd_endpoints, None)) .context("Failed to connect to etcd")?; thread_mgr::spawn( @@ -162,6 +134,97 @@ pub fn init_wal_receiver_main_thread( .context("Failed to spawn wal receiver main thread") } +async fn shutdown_all_wal_connections( + local_timeline_wal_receivers: &mut HashMap>>, +) { + info!("Shutting down all WAL connections"); + let mut broker_join_handles = Vec::new(); + for (tenant_id, timelines) in local_timeline_wal_receivers.drain() { + for (timeline_id, handles) in timelines { + handles.cancellation.send(()).ok(); + broker_join_handles.push(( + ZTenantTimelineId::new(tenant_id, timeline_id), + handles.handle, + )); + } + } + + let mut tenants = HashSet::with_capacity(broker_join_handles.len()); + for (id, broker_join_handle) in broker_join_handles { + tenants.insert(id.tenant_id); + debug!("Waiting for wal broker for timeline {id} to finish"); + if let Err(e) = broker_join_handle.await { + error!("Failed to join on wal broker for timeline {id}: {e}"); + } + } + if let Err(e) = tokio::task::spawn_blocking(move || { + for tenant_id in tenants { + if let Err(e) = tenant_mgr::set_tenant_state(tenant_id, TenantState::Idle) { + error!("Failed to make tenant {tenant_id} idle: {e:?}"); + } + } + }) + .await + { + error!("Failed to await a task to make all tenants idle: {e:?}"); + } +} + +/// A handle of an asynchronous task. +/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`] +/// and a cancellation channel that it can listen to for earlier interrupts. +/// +/// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission. +/// That may lead to certain events not being observed by the listener. +#[derive(Debug)] +struct TaskHandle { + handle: JoinHandle<()>, + events_receiver: watch::Receiver>, + cancellation: watch::Sender<()>, +} + +#[derive(Debug, Clone)] +pub enum TaskEvent { + Started, + NewEvent(E), + End(Result<(), String>), +} + +impl TaskHandle { + /// Initializes the task, starting it immediately after the creation. + pub fn spawn( + task: impl FnOnce(Arc>>, watch::Receiver<()>) -> Fut + Send + 'static, + ) -> Self + where + Fut: Future> + Send, + E: Sync + Send + 'static, + { + let (cancellation, cancellation_receiver) = watch::channel(()); + let (events_sender, events_receiver) = watch::channel(TaskEvent::Started); + let events_sender = Arc::new(events_sender); + + let sender = Arc::clone(&events_sender); + let handle = tokio::task::spawn(async move { + let task_result = task(sender, cancellation_receiver).await; + events_sender.send(TaskEvent::End(task_result)).ok(); + }); + + TaskHandle { + handle, + events_receiver, + cancellation, + } + } + + /// Aborts current task, waiting for it to finish. + async fn shutdown(self) { + self.cancellation.send(()).ok(); + if let Err(e) = self.handle.await { + error!("Task failed to shut down: {e}") + } + } +} + /// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery. /// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled. /// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled. @@ -171,10 +234,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( broker_prefix: &'a str, etcd_client: &'a Client, timeline_updates_receiver: &'a mut mpsc::UnboundedReceiver, - local_timeline_wal_receivers: &'a mut HashMap< - ZTenantId, - HashMap, - >, + local_timeline_wal_receivers: &'a mut HashMap>>, ) { // Only react on updates from [`tenant_mgr`] on local timeline attach/detach. match timeline_updates_receiver.recv().await { @@ -185,13 +245,8 @@ async fn wal_receiver_main_thread_loop_step<'a>( LocalTimelineUpdate::Detach(id) => { match local_timeline_wal_receivers.get_mut(&id.tenant_id) { Some(wal_receivers) => { - if let hash_map::Entry::Occupied(mut o) = wal_receivers.entry(id.timeline_id) { - if let Err(e) = o.get_mut().shutdown(id).await { - error!("Failed to shut down timeline {id} wal receiver handle: {e:#}"); - return; - } else { - o.remove(); - } + if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) { + o.remove().shutdown().await } if wal_receivers.is_empty() { if let Err(e) = change_tenant_state(id.tenant_id, TenantState::Idle).await { @@ -207,11 +262,11 @@ async fn wal_receiver_main_thread_loop_step<'a>( } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. LocalTimelineUpdate::Attach(new_id, new_timeline) => { - let timelines = local_timeline_wal_receivers + let timeline_connection_managers = local_timeline_wal_receivers .entry(new_id.tenant_id) .or_default(); - if timelines.is_empty() { + if timeline_connection_managers.is_empty() { if let Err(e) = change_tenant_state(new_id.tenant_id, TenantState::Active).await { @@ -220,13 +275,14 @@ async fn wal_receiver_main_thread_loop_step<'a>( } } - let vacant_timeline_entry = match timelines.entry(new_id.timeline_id) { - hash_map::Entry::Occupied(_) => { - debug!("Attepted to readd an existing timeline {new_id}, ignoring"); - return; - } - hash_map::Entry::Vacant(v) => v, - }; + let vacant_connection_manager_entry = + match timeline_connection_managers.entry(new_id.timeline_id) { + hash_map::Entry::Occupied(_) => { + debug!("Attepted to readd an existing timeline {new_id}, ignoring"); + return; + } + hash_map::Entry::Vacant(v) => v, + }; let (wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag) = match fetch_tenant_settings(new_id.tenant_id).await { @@ -248,48 +304,17 @@ async fn wal_receiver_main_thread_loop_step<'a>( ); } - let (cancellation_sender, mut cancellation_receiver) = watch::channel(()); - let mut wal_connection_manager = WalConnectionManager { - id: new_id, - timeline: Arc::clone(&new_timeline), - wal_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, - wal_connection_data: None, - wal_connection_attempt: 0, - }; - - let broker_prefix = broker_prefix.to_string(); - let mut loop_client = etcd_client.clone(); - let broker_join_handle = tokio::spawn(async move { - info!("WAL receiver broker started, connecting to etcd"); - let mut cancellation = cancellation_receiver.clone(); - loop { - select! { - _ = cancellation.changed() => { - info!("Wal broker loop cancelled, shutting down"); - break; - }, - step_result = timeline_wal_broker_loop_step( - &broker_prefix, - &mut loop_client, - &mut wal_connection_manager, - &mut cancellation_receiver, - ) => match step_result { - Ok(ControlFlow::Break(())) => { - break; - } - Ok(ControlFlow::Continue(())) => {} - Err(e) => warn!("Error during wal receiver main thread step for timeline {new_id}: {e:#}"), - } - } - } - }.instrument(info_span!("timeline", id = %new_id))); - - vacant_timeline_entry.insert(TimelineWalBrokerLoopHandle { - broker_join_handle, - cancellation_sender, - }); + vacant_connection_manager_entry.insert( + connection_manager::spawn_connection_manager_task( + new_id, + broker_prefix.to_owned(), + etcd_client.clone(), + new_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + ), + ); } } } @@ -324,859 +349,3 @@ async fn change_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> an .await .with_context(|| format!("Failed to spawn activation task for tenant {tenant_id}"))? } - -async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { - if n == 0 { - return; - } - let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds); - info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task"); - tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; -} - -async fn shutdown_all_wal_connections( - local_timeline_wal_receivers: &mut HashMap< - ZTenantId, - HashMap, - >, -) { - info!("Shutting down all WAL connections"); - let mut broker_join_handles = Vec::new(); - for (tenant_id, timelines) in local_timeline_wal_receivers.drain() { - for (timeline_id, handles) in timelines { - handles.cancellation_sender.send(()).ok(); - broker_join_handles.push(( - ZTenantTimelineId::new(tenant_id, timeline_id), - handles.broker_join_handle, - )); - } - } - - let mut tenants = HashSet::with_capacity(broker_join_handles.len()); - for (id, broker_join_handle) in broker_join_handles { - tenants.insert(id.tenant_id); - debug!("Waiting for wal broker for timeline {id} to finish"); - if let Err(e) = broker_join_handle.await { - error!("Failed to join on wal broker for timeline {id}: {e}"); - } - } - if let Err(e) = tokio::task::spawn_blocking(move || { - for tenant_id in tenants { - if let Err(e) = tenant_mgr::set_tenant_state(tenant_id, TenantState::Idle) { - error!("Failed to make tenant {tenant_id} idle: {e:?}"); - } - } - }) - .await - { - error!("Failed to spawn a task to make all tenants idle: {e:?}"); - } -} - -/// Broker WAL loop handle to cancel the loop safely when needed. -struct TimelineWalBrokerLoopHandle { - broker_join_handle: JoinHandle<()>, - cancellation_sender: watch::Sender<()>, -} - -impl TimelineWalBrokerLoopHandle { - /// Stops the broker loop, waiting for its current task to finish. - async fn shutdown(&mut self, id: ZTenantTimelineId) -> anyhow::Result<()> { - self.cancellation_sender.send(()).context( - "Unexpected: cancellation sender is dropped before the receiver in the loop is", - )?; - debug!("Waiting for wal receiver for timeline {id} to finish"); - let handle = &mut self.broker_join_handle; - handle - .await - .with_context(|| format!("Failed to join the wal reveiver broker for timeline {id}")) - } -} - -/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. -/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. -async fn timeline_wal_broker_loop_step( - broker_prefix: &str, - etcd_client: &mut Client, - wal_connection_manager: &mut WalConnectionManager, - cancellation: &mut watch::Receiver<()>, -) -> anyhow::Result> { - let id = wal_connection_manager.id; - - // Endlessly try to subscribe for broker updates for a given timeline. - // If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly. - // This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. - let mut broker_subscription: BrokerSubscription; - let mut attempt = 0; - loop { - select! { - _ = cancellation.changed() => { - info!("Subscription backoff cancelled, shutting down"); - return Ok(ControlFlow::Break(())); - }, - _ = exponential_backoff(attempt, 2.0, 60.0) => {}, - } - attempt += 1; - - select! { - _ = cancellation.changed() => { - info!("Broker subscription loop cancelled, shutting down"); - return Ok(ControlFlow::Break(())); - }, - new_subscription = etcd_broker::subscribe_for_json_values( - etcd_client, - SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), - ) - .instrument(info_span!("etcd_subscription")) => match new_subscription { - Ok(new_subscription) => { - broker_subscription = new_subscription; - break; - } - Err(e) => { - warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}"); - continue; - } - }, - - } - } - - info!("Subscribed for etcd timeline changes, considering walreceiver connections"); - - loop { - select! { - // the order of the polls is especially important here, since the first task to complete gets selected and the others get dropped (cancelled). - // place more frequetly updated tasks below to ensure the "slow" tasks are also reacted to. - biased; - // first, the cancellations are checked, to ensure we exit eagerly - _ = cancellation.changed() => { - info!("Broker loop cancelled, shutting down"); - break; - } - // then, we check for new events from the WAL connection: the existing connection should either return some progress data, - // or block, allowing other tasks in this `select!` to run first. - // - // We set a "timebomb" in the polling method, that waits long enough and cancels the entire loop if nothing happens during the wait. - // The wait is only initiated when no data (or a "channel closed" data) is received from the loop, ending with the break flow return. - // While waiting, more broker events are expected to be retrieved from etcd (currently, every safekeeper posts ~1 message/second). - // The timebomb ensures that we don't get stuck for too long on any of the WAL/etcd event polling, rather restarting the subscription entirely. - // - // We cannot return here eagerly on no WAL task data, since the result will get selected to early, not allowing etcd tasks to be polled properly. - // We cannot move etcd tasks above this select, since they are very frequent to finish and WAL events might get ignored. - // We need WAL events to periodically update the external data, so we cannot simply await the task result on the handler here. - wal_receiver_poll_result = wal_connection_manager.poll_connection_event_or_cancel() => match wal_receiver_poll_result { - ControlFlow::Break(()) => break, - ControlFlow::Continue(()) => {}, - }, - // finally, if no other tasks are completed, get another broker update and possibly reconnect - updates = broker_subscription.fetch_data() => match updates { - Some(mut all_timeline_updates) => { - match all_timeline_updates.remove(&id) { - Some(subscribed_timeline_updates) => { - match wal_connection_manager.select_connection_candidate(subscribed_timeline_updates) { - Some(candidate) => { - info!("Switching to different safekeeper {} for timeline {id}, reason: {:?}", candidate.safekeeper_id, candidate.reason); - wal_connection_manager.change_connection(candidate.safekeeper_id, candidate.wal_producer_connstr).await; - }, - None => debug!("No connection candidate was selected for timeline"), - } - } - // XXX: If we subscribe for a certain timeline, we expect only its data to come. - // But somebody could propagate a new etcd key, that has the same prefix as the subscribed one, then we'll get odd data. - // This is an error, we don't want to have overlapping prefixes for timelines, but we can complain and thow those away instead of panicking, - // since the next poll might bring the correct data. - None => error!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()), - } - }, - None => { - info!("Subscription source end was dropped, no more updates are possible, shutting down"); - break; - }, - }, - } - } - - info!("Waiting for the current connection to close"); - wal_connection_manager.close_connection().await; - broker_subscription - .cancel() - .await - .with_context(|| format!("Failed to cancel timeline {id} subscription in etcd"))?; - Ok(ControlFlow::Continue(())) -} - -/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. -struct WalConnectionManager { - id: ZTenantTimelineId, - timeline: Arc, - wal_connect_timeout: Duration, - lagging_wal_timeout: Duration, - max_lsn_wal_lag: NonZeroU64, - wal_connection_attempt: u32, - wal_connection_data: Option, -} - -#[derive(Debug)] -struct WalConnectionData { - safekeeper_id: NodeId, - connection: WalReceiverConnection, - connection_init_time: NaiveDateTime, - last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>, -} - -#[derive(Debug, PartialEq, Eq)] -struct NewWalConnectionCandidate { - safekeeper_id: NodeId, - wal_producer_connstr: String, - reason: ReconnectReason, -} - -/// Stores the reason why WAL connection was switched, for furter debugging purposes. -#[derive(Debug, PartialEq, Eq)] -enum ReconnectReason { - NoExistingConnection, - LaggingWal { - current_lsn: Lsn, - new_lsn: Lsn, - threshold: NonZeroU64, - }, - NoWalTimeout { - last_wal_interaction: NaiveDateTime, - check_time: NaiveDateTime, - threshold: Duration, - }, -} - -impl WalConnectionManager { - /// Tries to get more data from the WAL connection. - /// If the WAL connection channel is dropped or no data is retrieved, a "timebomb" future is started to break the existing broker subscription. - /// This future is intended to be used in the `select!` loop, so lengthy future normally gets dropped due to other futures completing. - /// If not, it's better to cancel the entire "stuck" loop and start over. - async fn poll_connection_event_or_cancel(&mut self) -> ControlFlow<(), ()> { - let (connection_data, wal_receiver_event) = match self.wal_connection_data.as_mut() { - Some(connection_data) => match connection_data.connection.next_event().await { - Some(event) => (connection_data, event), - None => { - warn!("WAL receiver event source stopped sending messages, waiting for other events to arrive"); - tokio::time::sleep(Duration::from_secs(30)).await; - warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); - return ControlFlow::Break(()); - } - }, - None => { - tokio::time::sleep(Duration::from_secs(30)).await; - warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); - return ControlFlow::Break(()); - } - }; - - match wal_receiver_event { - WalConnectionEvent::Started => { - self.wal_connection_attempt = 0; - } - WalConnectionEvent::NewWal(new_wal_data) => { - self.wal_connection_attempt = 0; - connection_data.last_wal_receiver_data = - Some((new_wal_data, Utc::now().naive_utc())); - } - WalConnectionEvent::End(wal_receiver_result) => { - match wal_receiver_result { - Ok(()) => { - info!("WAL receiver task finished, reconnecting"); - self.wal_connection_attempt = 0; - } - Err(e) => { - error!("WAL receiver task failed: {e:#}, reconnecting"); - self.wal_connection_attempt += 1; - } - } - self.close_connection().await; - } - } - - ControlFlow::Continue(()) - } - - /// Shuts down current connection (if any), waiting for it to finish. - async fn close_connection(&mut self) { - if let Some(data) = self.wal_connection_data.as_mut() { - match data.connection.shutdown().await { - Err(e) => { - error!("Failed to shutdown wal receiver connection: {e:#}"); - } - Ok(()) => self.wal_connection_data = None, - } - } - } - - /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection( - &mut self, - new_safekeeper_id: NodeId, - new_wal_producer_connstr: String, - ) { - self.close_connection().await; - self.wal_connection_data = Some(WalConnectionData { - safekeeper_id: new_safekeeper_id, - connection: WalReceiverConnection::open( - self.id, - new_safekeeper_id, - new_wal_producer_connstr, - self.wal_connect_timeout, - ), - connection_init_time: Utc::now().naive_utc(), - last_wal_receiver_data: None, - }); - } - - /// Checks current state against every fetched safekeeper state of a given timeline. - /// Returns a new candidate, if the current state is somewhat lagging, or `None` otherwise. - /// The current rules for approving new candidates: - /// * pick from the input data from etcd for currently connected safekeeper (if any) - /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline - /// * if there's no such entry, no new candidate found, abort - /// * otherwise, check if etcd updates contain currently connected safekeeper - /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) - /// Reconnect if the time exceeds the threshold. - /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold - /// - /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. - /// Both thresholds are configured per tenant. - fn select_connection_candidate( - &self, - mut safekeeper_timelines: HashMap, - ) -> Option { - let current_sk_data_updated = - self.wal_connection_data - .as_ref() - .and_then(|connection_data| { - safekeeper_timelines.remove(&connection_data.safekeeper_id) - }); - - let candidate_sk_data = safekeeper_timelines - .iter() - .filter(|(_, info)| { - info.commit_lsn > Some(self.timeline.tline.get_last_record_lsn()) - }) - .filter_map(|(sk_id, info)| { - match wal_stream_connection_string( - self.id, - info.safekeeper_connstr.as_deref()?, - ) { - Ok(connstr) => Some((sk_id, info, connstr)), - Err(e) => { - error!("Failed to create wal receiver connection string from broker data of safekeeper node {sk_id}: {e:#}"); - None - } - } - }) - .max_by_key(|(_, info, _)| info.commit_lsn); - - match (current_sk_data_updated, candidate_sk_data) { - // No better candidate than one we're already connected to: - // whatever data update comes for the connected one, we don't have a better candidate - (_, None) => None, - - // No updates from the old SK in this batch, but some candidate is available: - // check how long time ago did we receive updates from the current SK, switch connections in case it's over the threshold - (None, Some((&new_sk_id, _, new_wal_producer_connstr))) => { - match self.wal_connection_data.as_ref() { - Some(current_connection) => { - let last_sk_interaction_time = - match current_connection.last_wal_receiver_data.as_ref() { - Some((_, data_submission_time)) => *data_submission_time, - None => current_connection.connection_init_time, - }; - - let now = Utc::now().naive_utc(); - match (now - last_sk_interaction_time).to_std() { - Ok(last_interaction) => { - if last_interaction > self.lagging_wal_timeout { - return Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::NoWalTimeout { - last_wal_interaction: last_sk_interaction_time, - check_time: now, - threshold: self.lagging_wal_timeout, - }, - }); - } - } - Err(_e) => { - warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", current_connection.safekeeper_id); - } - } - None - } - None => Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::NoExistingConnection, - }), - } - } - // Both current SK got updated via etcd and there's another candidate with suitable Lsn: - // check how bigger the new SK Lsn is in the future compared to the current SK, switch connections in case it's over the threshold - ( - Some(current_sk_timeline), - Some((&new_sk_id, new_sk_timeline, new_wal_producer_connstr)), - ) => { - let new_lsn = new_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); - let current_lsn = current_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); - match new_lsn.0.checked_sub(current_lsn.0) - { - Some(new_sk_lsn_advantage) => { - if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { - return Some( - NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, - }); - } - } - None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), - } - - None - } - } - } -} - -fn wal_stream_connection_string( - ZTenantTimelineId { - tenant_id, - timeline_id, - }: ZTenantTimelineId, - listen_pg_addr_str: &str, -) -> anyhow::Result { - let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); - let me_conf = sk_connstr - .parse::() - .with_context(|| { - format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") - })?; - let (host, port) = utils::connstring::connection_host_port(&me_conf); - Ok(format!( - "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" - )) -} - -#[cfg(test)] -mod tests { - use std::time::SystemTime; - - use crate::repository::{ - repo_harness::{RepoHarness, TIMELINE_ID}, - Repository, - }; - - use super::*; - - #[test] - fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_no_candidate")?; - let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); - data_manager_with_no_connection.wal_connection_data = None; - - let no_candidate = - data_manager_with_no_connection.select_connection_candidate(HashMap::from([ - ( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: None, - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: None, - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), - }, - ), - ( - NodeId(3), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: None, - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), - }, - ), - ])); - - assert!( - no_candidate.is_none(), - "Expected no candidate selected out of non full data options, but got {no_candidate:?}" - ); - - Ok(()) - } - - #[tokio::test] - async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("connection_no_candidate")?; - - let current_lsn = 100_000; - let connected_sk_id = NodeId(0); - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let mut dummy_connection_data = dummy_connection_data( - ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }, - connected_sk_id, - ) - .await; - let now = Utc::now().naive_utc(); - dummy_connection_data.last_wal_receiver_data = Some(( - ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: 1, - ps_applylsn: current_lsn, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - }, - now, - )); - dummy_connection_data.connection_init_time = now; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let no_candidate = - data_manager_with_connection.select_connection_candidate(HashMap::from([ - ( - connected_sk_id, - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn( - current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() * 2 - )), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(current_lsn)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("not advanced Lsn".to_string()), - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn( - current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() / 2 - )), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("not enough advanced Lsn".to_string()), - }, - ), - ])); - - assert!( - no_candidate.is_none(), - "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}" - ); - - Ok(()) - } - - #[test] - fn no_connection_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_candidate")?; - let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); - data_manager_with_no_connection.wal_connection_data = None; - - let only_candidate = data_manager_with_no_connection - .select_connection_candidate(HashMap::from([( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1 + data_manager_with_no_connection - .max_lsn_wal_lag - .get())), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - )])) - .expect("Expected one candidate selected out of the only data option, but got none"); - assert_eq!(only_candidate.safekeeper_id, NodeId(0)); - assert_eq!( - only_candidate.reason, - ReconnectReason::NoExistingConnection, - "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" - ); - assert!(only_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - let selected_lsn = 100_000; - let biggest_wal_candidate = data_manager_with_no_connection - .select_connection_candidate(HashMap::from([ - ( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn - 100)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("smaller commit_lsn".to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn + 100)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: None, - }, - ), - ])) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1)); - assert_eq!( - biggest_wal_candidate.reason, - ReconnectReason::NoExistingConnection, - "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" - ); - assert!(biggest_wal_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - Ok(()) - } - - #[tokio::test] - async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; - let current_lsn = Lsn(100_000).align(); - - let id = ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }; - - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let connected_sk_id = NodeId(0); - let mut dummy_connection_data = dummy_connection_data(id, connected_sk_id).await; - let lagging_wal_timeout = - chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; - let time_over_threshold = - Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; - dummy_connection_data.last_wal_receiver_data = Some(( - ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: current_lsn.0, - ps_applylsn: 1, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - }, - time_over_threshold, - )); - dummy_connection_data.connection_init_time = time_over_threshold; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1); - let candidates = HashMap::from([ - ( - connected_sk_id, - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(current_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(new_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), - }, - ), - ]); - - let over_threshcurrent_candidate = data_manager_with_connection - .select_connection_candidate(candidates) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1)); - assert_eq!( - over_threshcurrent_candidate.reason, - ReconnectReason::LaggingWal { - current_lsn, - new_lsn, - threshold: data_manager_with_connection.max_lsn_wal_lag - }, - "Should select bigger WAL safekeeper if it starts to lag enough" - ); - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains("advanced by Lsn safekeeper")); - - Ok(()) - } - - #[tokio::test] - async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; - let current_lsn = Lsn(100_000).align(); - - let id = ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }; - - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let mut dummy_connection_data = dummy_connection_data(id, NodeId(1)).await; - let lagging_wal_timeout = - chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; - let time_over_threshold = - Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; - dummy_connection_data.last_wal_receiver_data = None; - dummy_connection_data.connection_init_time = time_over_threshold; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let over_threshcurrent_candidate = data_manager_with_connection - .select_connection_candidate(HashMap::from([( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(current_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - )])) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); - match over_threshcurrent_candidate.reason { - ReconnectReason::NoWalTimeout { - last_wal_interaction, - threshold, - .. - } => { - assert_eq!(last_wal_interaction, time_over_threshold); - assert_eq!(threshold, data_manager_with_connection.lagging_wal_timeout); - } - unexpected => panic!("Unexpected reason: {unexpected:?}"), - } - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - Ok(()) - } - - fn dummy_wal_connection_manager(harness: &RepoHarness) -> WalConnectionManager { - WalConnectionManager { - id: ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }, - timeline: Arc::new(DatadirTimelineImpl::new( - harness - .load() - .create_empty_timeline(TIMELINE_ID, Lsn(0)) - .expect("Failed to create an empty timeline for dummy wal connection manager"), - 10_000, - )), - wal_connect_timeout: Duration::from_secs(1), - lagging_wal_timeout: Duration::from_secs(10), - max_lsn_wal_lag: NonZeroU64::new(300_000).unwrap(), - wal_connection_attempt: 0, - wal_connection_data: None, - } - } - - const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; - - // the function itself does not need async, but it spawns a tokio::task underneath hence neeed - // a runtime to not to panic - async fn dummy_connection_data( - id: ZTenantTimelineId, - safekeeper_id: NodeId, - ) -> WalConnectionData { - let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR) - .expect("Failed to construct dummy wal producer connstr"); - WalConnectionData { - safekeeper_id, - connection: WalReceiverConnection::open( - id, - safekeeper_id, - dummy_connstr, - Duration::from_secs(1), - ), - connection_init_time: Utc::now().naive_utc(), - last_wal_receiver_data: None, - } - } -} diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs new file mode 100644 index 0000000000..d5ca1d5159 --- /dev/null +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -0,0 +1,1133 @@ +//! WAL receiver logic that ensures the pageserver gets connectected to safekeeper, +//! that contains the latest WAL to stream and this connection does not go stale. +//! +//! To achieve that, a etcd broker is used: safekepers propagate their timelines' state in it, +//! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection. +//! Current connection state is tracked too, to ensure it's not getting stale. +//! +//! After every connection or etcd update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader, +//! then a [re]connection happens, if necessary. +//! Only WAL streaming task expects to be finished, other loops (etcd, connection management) never exit unless cancelled explicitly via the dedicated channel. + +use std::{ + collections::{hash_map, HashMap}, + num::NonZeroU64, + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use chrono::{DateTime, Local, NaiveDateTime, Utc}; +use etcd_broker::{ + subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, + BrokerUpdate, Client, +}; +use tokio::select; +use tracing::*; + +use crate::DatadirTimelineImpl; +use utils::{ + lsn::Lsn, + pq_proto::ReplicationFeedback, + zid::{NodeId, ZTenantTimelineId}, +}; + +use super::{TaskEvent, TaskHandle}; + +/// Spawns the loop to take care of the timeline's WAL streaming connection. +pub(super) fn spawn_connection_manager_task( + id: ZTenantTimelineId, + broker_loop_prefix: String, + mut client: Client, + local_timeline: Arc, + wal_connect_timeout: Duration, + lagging_wal_timeout: Duration, + max_lsn_wal_lag: NonZeroU64, +) -> TaskHandle<()> { + TaskHandle::spawn(move |_, mut cancellation| { + async move { + info!("WAL receiver broker started, connecting to etcd"); + let mut walreceiver_state = WalreceiverState::new( + id, + local_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + ); + loop { + select! { + _ = cancellation.changed() => { + info!("Broker subscription init cancelled, shutting down"); + if let Some(wal_connection) = walreceiver_state.wal_connection.take() + { + wal_connection.connection_task.shutdown().await; + } + return Ok(()); + }, + + _ = connection_manager_loop_step( + &broker_loop_prefix, + &mut client, + &mut walreceiver_state, + ) => {}, + } + } + } + .instrument(info_span!("wal_connection_manager", id = %id)) + }) +} + +/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. +/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. +/// If etcd subscription is cancelled, exits. +async fn connection_manager_loop_step( + broker_prefix: &str, + etcd_client: &mut Client, + walreceiver_state: &mut WalreceiverState, +) { + let id = walreceiver_state.id; + + // XXX: We never explicitly cancel etcd task, instead establishing one and never letting it go, + // running the entire loop step as much as possible to an end. + // The task removal happens implicitly on drop, both aborting the etcd subscription task and dropping the receiver channel end, + // forcing the etcd subscription to exit either way. + let mut broker_subscription = + subscribe_for_timeline_updates(etcd_client, broker_prefix, id).await; + info!("Subscribed for etcd timeline changes, waiting for new etcd data"); + + loop { + select! { + broker_connection_result = &mut broker_subscription.watcher_handle => { + cleanup_broker_connection(broker_connection_result, walreceiver_state); + return; + }, + + Some(wal_connection_update) = async { + match walreceiver_state.wal_connection.as_mut() { + Some(wal_connection) => { + let receiver = &mut wal_connection.connection_task.events_receiver; + Some(match receiver.changed().await { + Ok(()) => receiver.borrow().clone(), + Err(_cancellation_error) => TaskEvent::End(Ok(())), + }) + } + None => None, + } + } => { + let (connection_update, reset_connection_attempts) = match &wal_connection_update { + TaskEvent::Started => (Some(Utc::now().naive_utc()), true), + TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::::from(replication_feedback.ps_replytime).naive_utc()), true), + TaskEvent::End(end_result) => { + let should_reset_connection_attempts = match end_result { + Ok(()) => { + debug!("WAL receiving task finished"); + true + }, + Err(e) => { + warn!("WAL receiving task failed: {e}"); + false + }, + }; + walreceiver_state.wal_connection = None; + (None, should_reset_connection_attempts) + }, + }; + + if let Some(connection_update) = connection_update { + match &mut walreceiver_state.wal_connection { + Some(wal_connection) => { + wal_connection.latest_connection_update = connection_update; + + let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0); + if reset_connection_attempts { + *attempts_entry = 0; + } else { + *attempts_entry += 1; + } + }, + None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"), + } + } + }, + + broker_update = broker_subscription.value_updates.recv() => { + match broker_update { + Some(broker_update) => walreceiver_state.register_timeline_update(broker_update), + None => { + info!("Broker sender end was dropped, ending current broker loop step"); + // Ensure to cancel and wait for the broker subscription task end, to log its result. + // Broker sender end is in the broker subscription task and its drop means abnormal task completion. + // First, ensure that the task is stopped (abort can be done without errors on already stopped tasks and repeated multiple times). + broker_subscription.watcher_handle.abort(); + // Then, wait for the task to finish and print its result. If the task was finished before abort (which we assume in this abnormal case), + // a proper error message will be printed, otherwise an abortion message is printed which is ok, since we're signalled to finish anyway. + cleanup_broker_connection( + (&mut broker_subscription.watcher_handle).await, + walreceiver_state, + ); + return; + } + } + }, + } + + // Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly. + let mut max_events_to_poll = 100_u32; + while max_events_to_poll > 0 { + if let Ok(broker_update) = broker_subscription.value_updates.try_recv() { + walreceiver_state.register_timeline_update(broker_update); + max_events_to_poll -= 1; + } else { + break; + } + } + + if let Some(new_candidate) = walreceiver_state.next_connection_candidate() { + info!("Switching to new connection candidate: {new_candidate:?}"); + walreceiver_state + .change_connection( + new_candidate.safekeeper_id, + new_candidate.wal_producer_connstr, + ) + .await + } + } +} + +fn cleanup_broker_connection( + broker_connection_result: Result, tokio::task::JoinError>, + walreceiver_state: &mut WalreceiverState, +) { + match broker_connection_result { + Ok(Ok(())) => info!("Broker conneciton task finished, ending current broker loop step"), + Ok(Err(broker_error)) => warn!("Broker conneciton ended with error: {broker_error}"), + Err(abort_error) => { + if abort_error.is_panic() { + error!("Broker connection panicked: {abort_error}") + } else { + debug!("Broker connection aborted: {abort_error}") + } + } + } + + walreceiver_state.wal_stream_candidates.clear(); +} + +/// Endlessly try to subscribe for broker updates for a given timeline. +/// If there are no safekeepers to maintain the lease, the timeline subscription will be unavailable in the broker and the operation will fail constantly. +/// This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. +async fn subscribe_for_timeline_updates( + etcd_client: &mut Client, + broker_prefix: &str, + id: ZTenantTimelineId, +) -> BrokerSubscription { + let mut attempt = 0; + loop { + exponential_backoff( + attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ) + .await; + attempt += 1; + + match etcd_broker::subscribe_for_json_values( + etcd_client, + SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), + ) + .instrument(info_span!("etcd_subscription")) + .await + { + Ok(new_subscription) => { + return new_subscription; + } + Err(e) => { + warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}"); + continue; + } + } + } +} + +const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 2.0; +const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 60.0; + +async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { + if n == 0 { + return; + } + let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds); + info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task"); + tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; +} + +/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. +struct WalreceiverState { + id: ZTenantTimelineId, + /// Use pageserver data about the timeline to filter out some of the safekeepers. + local_timeline: Arc, + /// The timeout on the connection to safekeeper for WAL streaming. + wal_connect_timeout: Duration, + /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. + lagging_wal_timeout: Duration, + /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one. + max_lsn_wal_lag: NonZeroU64, + /// Current connection to safekeeper for WAL streaming. + wal_connection: Option, + wal_connection_attempts: HashMap, + /// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id. + wal_stream_candidates: HashMap, +} + +/// Current connection data. +#[derive(Debug)] +struct WalConnection { + /// Current safekeeper pageserver is connected to for WAL streaming. + sk_id: NodeId, + /// Connection task start time or the timestamp of a latest connection message received. + latest_connection_update: NaiveDateTime, + /// WAL streaming task handle. + connection_task: TaskHandle, +} + +/// Data about the timeline to connect to, received from etcd. +#[derive(Debug)] +struct EtcdSkTimeline { + timeline: SkTimelineInfo, + /// Etcd generation, the bigger it is, the more up to date the timeline data is. + etcd_version: i64, + /// Time at which the data was fetched from etcd last time, to track the stale data. + latest_update: NaiveDateTime, +} + +impl WalreceiverState { + fn new( + id: ZTenantTimelineId, + local_timeline: Arc, + wal_connect_timeout: Duration, + lagging_wal_timeout: Duration, + max_lsn_wal_lag: NonZeroU64, + ) -> Self { + Self { + id, + local_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + wal_connection: None, + wal_stream_candidates: HashMap::new(), + wal_connection_attempts: HashMap::new(), + } + } + + /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. + async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) { + if let Some(old_connection) = self.wal_connection.take() { + old_connection.connection_task.shutdown().await + } + + let id = self.id; + let connect_timeout = self.wal_connect_timeout; + let connection_attempt = self + .wal_connection_attempts + .get(&new_sk_id) + .copied() + .unwrap_or(0); + let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { + async move { + exponential_backoff( + connection_attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ) + .await; + super::walreceiver_connection::handle_walreceiver_connection( + id, + &new_wal_producer_connstr, + events_sender.as_ref(), + cancellation, + connect_timeout, + ) + .await + .map_err(|e| format!("walreceiver connection handling failure: {e:#}")) + } + .instrument(info_span!("walreceiver_connection", id = %id)) + }); + + self.wal_connection = Some(WalConnection { + sk_id: new_sk_id, + latest_connection_update: Utc::now().naive_utc(), + connection_task: connection_handle, + }); + } + + /// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key. + fn register_timeline_update(&mut self, timeline_update: BrokerUpdate) { + match self + .wal_stream_candidates + .entry(timeline_update.key.node_id) + { + hash_map::Entry::Occupied(mut o) => { + let existing_value = o.get_mut(); + if existing_value.etcd_version < timeline_update.etcd_version { + existing_value.etcd_version = timeline_update.etcd_version; + existing_value.timeline = timeline_update.value; + existing_value.latest_update = Utc::now().naive_utc(); + } + } + hash_map::Entry::Vacant(v) => { + v.insert(EtcdSkTimeline { + timeline: timeline_update.value, + etcd_version: timeline_update.etcd_version, + latest_update: Utc::now().naive_utc(), + }); + } + } + } + + /// Cleans up stale etcd records and checks the rest for the new connection candidate. + /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise. + /// The current rules for approving new candidates: + /// * pick from the input data from etcd for currently connected safekeeper (if any) + /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline + /// * if there's no such entry, no new candidate found, abort + /// * check the current connection time data for staleness, reconnect if stale + /// * otherwise, check if etcd updates contain currently connected safekeeper + /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) + /// Reconnect if the time exceeds the threshold. + /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold + /// + /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. + /// Both thresholds are configured per tenant. + fn next_connection_candidate(&mut self) -> Option { + self.cleanup_old_candidates(); + + match &self.wal_connection { + Some(existing_wal_connection) => { + let connected_sk_node = existing_wal_connection.sk_id; + + let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self + .applicable_connection_candidates() + .filter(|&(sk_id, _, _)| sk_id != connected_sk_node) + .max_by_key(|(_, info, _)| info.commit_lsn)?; + + let now = Utc::now().naive_utc(); + if let Ok(latest_interaciton) = + (now - existing_wal_connection.latest_connection_update).to_std() + { + if latest_interaciton > self.lagging_wal_timeout { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoWalTimeout { + last_wal_interaction: Some( + existing_wal_connection.latest_connection_update, + ), + check_time: now, + threshold: self.lagging_wal_timeout, + }, + }); + } + } + + match self.wal_stream_candidates.get(&connected_sk_node) { + Some(current_connection_etcd_data) => { + let new_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0)); + let current_lsn = current_connection_etcd_data + .timeline + .commit_lsn + .unwrap_or(Lsn(0)); + match new_lsn.0.checked_sub(current_lsn.0) + { + Some(new_sk_lsn_advantage) => { + if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + return Some( + NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, + }); + } + } + None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), + } + } + None => { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoEtcdDataForExistingConnection, + }) + } + } + } + None => { + let (new_sk_id, _, new_wal_producer_connstr) = self + .applicable_connection_candidates() + .max_by_key(|(_, info, _)| info.commit_lsn)?; + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoExistingConnection, + }); + } + } + + None + } + + fn applicable_connection_candidates( + &self, + ) -> impl Iterator { + self.wal_stream_candidates + .iter() + .filter(|(_, etcd_info)| { + etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn()) + }) + .filter_map(|(sk_id, etcd_info)| { + let info = &etcd_info.timeline; + match wal_stream_connection_string( + self.id, + info.safekeeper_connstr.as_deref()?, + ) { + Ok(connstr) => Some((*sk_id, info, connstr)), + Err(e) => { + error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id); + None + } + } + }) + } + + fn cleanup_old_candidates(&mut self) { + self.wal_stream_candidates.retain(|_, etcd_info| { + if let Ok(time_since_latest_etcd_update) = + (Utc::now().naive_utc() - etcd_info.latest_update).to_std() + { + time_since_latest_etcd_update < self.lagging_wal_timeout + } else { + true + } + }); + } +} + +#[derive(Debug, PartialEq, Eq)] +struct NewWalConnectionCandidate { + safekeeper_id: NodeId, + wal_producer_connstr: String, + reason: ReconnectReason, +} + +/// Stores the reason why WAL connection was switched, for furter debugging purposes. +#[derive(Debug, PartialEq, Eq)] +enum ReconnectReason { + NoExistingConnection, + NoEtcdDataForExistingConnection, + LaggingWal { + current_lsn: Lsn, + new_lsn: Lsn, + threshold: NonZeroU64, + }, + NoWalTimeout { + last_wal_interaction: Option, + check_time: NaiveDateTime, + threshold: Duration, + }, +} + +fn wal_stream_connection_string( + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + listen_pg_addr_str: &str, +) -> anyhow::Result { + let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); + let me_conf = sk_connstr + .parse::() + .with_context(|| { + format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") + })?; + let (host, port) = utils::connstring::connection_host_port(&me_conf); + Ok(format!( + "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" + )) +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use crate::repository::{ + repo_harness::{RepoHarness, TIMELINE_ID}, + Repository, + }; + + use super::*; + + #[test] + fn no_connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_no_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = None; + state.wal_stream_candidates = HashMap::from([ + ( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(3), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: delay_over_threshold, + }, + ), + ]); + + let no_candidate = state.next_connection_candidate(); + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of non full data options, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[tokio::test] + async fn connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("connection_no_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + let connected_sk_id = NodeId(0); + let current_lsn = 100_000; + + state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: 1, + ps_applylsn: current_lsn, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([ + ( + connected_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() * 2)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() / 2)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + + let no_candidate = state.next_connection_candidate(); + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[test] + fn no_connection_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + state.wal_connection = None; + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let only_candidate = state + .next_connection_candidate() + .expect("Expected one candidate selected out of the only data option, but got none"); + assert_eq!(only_candidate.safekeeper_id, NodeId(0)); + assert_eq!( + only_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + let selected_lsn = 100_000; + state.wal_stream_candidates = HashMap::from([ + ( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn - 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn + 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + let biggest_wal_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + biggest_wal_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(biggest_wal_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("connection_no_etcd_data_candidate")?; + let mut state = dummy_state(&harness); + + let now = Utc::now().naive_utc(); + let current_lsn = Lsn(100_000).align(); + let connected_sk_id = NodeId(0); + let other_sk_id = NodeId(connected_sk_id.0 + 1); + + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([( + other_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let only_candidate = state + .next_connection_candidate() + .expect("Expected one candidate selected out of the only data option, but got none"); + assert_eq!(only_candidate.safekeeper_id, other_sk_id); + assert_eq!( + only_candidate.reason, + ReconnectReason::NoEtcdDataForExistingConnection, + "Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper" + ); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let connected_sk_id = NodeId(0); + let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1); + + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([ + ( + connected_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(new_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + over_threshcurrent_candidate.reason, + ReconnectReason::LaggingWal { + current_lsn, + new_lsn, + threshold: state.max_lsn_wal_lag + }, + "Should select bigger WAL safekeeper if it starts to lag enough" + ); + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains("advanced by Lsn safekeeper")); + + Ok(()) + } + + #[tokio::test] + async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = Some(WalConnection { + sk_id: NodeId(1), + latest_connection_update: time_over_threshold, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); + match over_threshcurrent_candidate.reason { + ReconnectReason::NoWalTimeout { + last_wal_interaction, + threshold, + .. + } => { + assert_eq!(last_wal_interaction, Some(time_over_threshold)); + assert_eq!(threshold, state.lagging_wal_timeout); + } + unexpected => panic!("Unexpected reason: {unexpected:?}"), + } + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn timeout_connection_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_connection_over_threshhold_current_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = Some(WalConnection { + sk_id: NodeId(1), + latest_connection_update: time_over_threshold, + connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), + }); + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); + match over_threshcurrent_candidate.reason { + ReconnectReason::NoWalTimeout { + last_wal_interaction, + threshold, + .. + } => { + assert_eq!(last_wal_interaction, Some(time_over_threshold)); + assert_eq!(threshold, state.lagging_wal_timeout); + } + unexpected => panic!("Unexpected reason: {unexpected:?}"), + } + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; + + fn dummy_state(harness: &RepoHarness) -> WalreceiverState { + WalreceiverState { + id: ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }, + local_timeline: Arc::new(DatadirTimelineImpl::new( + harness + .load() + .create_empty_timeline(TIMELINE_ID, Lsn(0)) + .expect("Failed to create an empty timeline for dummy wal connection manager"), + 10_000, + )), + wal_connect_timeout: Duration::from_secs(1), + lagging_wal_timeout: Duration::from_secs(1), + max_lsn_wal_lag: NonZeroU64::new(1).unwrap(), + wal_connection: None, + wal_stream_candidates: HashMap::new(), + wal_connection_attempts: HashMap::new(), + } + } +} diff --git a/pageserver/src/walreceiver/connection_handler.rs b/pageserver/src/walreceiver/walreceiver_connection.rs similarity index 78% rename from pageserver/src/walreceiver/connection_handler.rs rename to pageserver/src/walreceiver/walreceiver_connection.rs index 97b9b8cc9b..98b36dfe48 100644 --- a/pageserver/src/walreceiver/connection_handler.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -1,5 +1,5 @@ //! Actual Postgres connection handler to stream WAL to the server. -//! Runs as a separate, cancellable Tokio task. + use std::{ str::FromStr, sync::Arc, @@ -10,113 +10,29 @@ use anyhow::{bail, ensure, Context}; use bytes::BytesMut; use fail::fail_point; use postgres::{SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::{pin, select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_stream::StreamExt; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use utils::{ - lsn::Lsn, - pq_proto::ReplicationFeedback, - zid::{NodeId, ZTenantTimelineId}, -}; +use super::TaskEvent; use crate::{ http::models::WalReceiverEntry, repository::{Repository, Timeline}, tenant_mgr, walingest::WalIngest, }; +use postgres_ffi::waldecoder::WalStreamDecoder; +use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; -#[derive(Debug, Clone)] -pub enum WalConnectionEvent { - Started, - NewWal(ReplicationFeedback), - End(Result<(), String>), -} - -/// A wrapper around standalone Tokio task, to poll its updates or cancel the task. -#[derive(Debug)] -pub struct WalReceiverConnection { - handle: tokio::task::JoinHandle<()>, - cancellation: watch::Sender<()>, - events_receiver: watch::Receiver, -} - -impl WalReceiverConnection { - /// Initializes the connection task, returning a set of handles on top of it. - /// The task is started immediately after the creation, fails if no connection is established during the timeout given. - pub fn open( - id: ZTenantTimelineId, - safekeeper_id: NodeId, - wal_producer_connstr: String, - connect_timeout: Duration, - ) -> Self { - let (cancellation, mut cancellation_receiver) = watch::channel(()); - let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started); - - let handle = tokio::spawn( - async move { - let connection_result = handle_walreceiver_connection( - id, - &wal_producer_connstr, - &events_sender, - &mut cancellation_receiver, - connect_timeout, - ) - .await - .map_err(|e| { - format!("Walreceiver connection for id {id} failed with error: {e:#}") - }); - - match &connection_result { - Ok(()) => { - debug!("Walreceiver connection for id {id} ended successfully") - } - Err(e) => warn!("{e}"), - } - events_sender - .send(WalConnectionEvent::End(connection_result)) - .ok(); - } - .instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)), - ); - - Self { - handle, - cancellation, - events_receiver, - } - } - - /// Polls for the next WAL receiver event, if there's any available since the last check. - /// Blocks if there's no new event available, returns `None` if no new events will ever occur. - /// Only the last event is returned, all events received between observatins are lost. - pub async fn next_event(&mut self) -> Option { - match self.events_receiver.changed().await { - Ok(()) => Some(self.events_receiver.borrow().clone()), - Err(_cancellation_error) => None, - } - } - - /// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed. - pub async fn shutdown(&mut self) -> anyhow::Result<()> { - self.cancellation.send(()).ok(); - let handle = &mut self.handle; - handle - .await - .context("Failed to join on a walreceiver connection task")?; - Ok(()) - } -} - -async fn handle_walreceiver_connection( +/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming. +pub async fn handle_walreceiver_connection( id: ZTenantTimelineId, wal_producer_connstr: &str, - events_sender: &watch::Sender, - cancellation: &mut watch::Receiver<()>, + events_sender: &watch::Sender>, + mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { // Connect to the database in replication mode. @@ -214,8 +130,6 @@ async fn handle_walreceiver_connection( while let Some(replication_message) = { select! { - // check for shutdown first - biased; _ = cancellation.changed() => { info!("walreceiver interrupted"); None @@ -344,7 +258,7 @@ async fn handle_walreceiver_connection( .as_mut() .zenith_status_update(data.len() as u64, &data) .await?; - if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) { + if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) { warn!("Wal connection event listener dropped, aborting the connection: {e}"); return Ok(()); } diff --git a/proxy/src/auth/backend/console.rs b/proxy/src/auth/backend/console.rs index 93462086ea..3085f0b0e4 100644 --- a/proxy/src/auth/backend/console.rs +++ b/proxy/src/auth/backend/console.rs @@ -49,6 +49,12 @@ impl UserFacingError for ConsoleAuthError { } } +impl From<&auth::credentials::ClientCredsParseError> for ConsoleAuthError { + fn from(e: &auth::credentials::ClientCredsParseError) -> Self { + ConsoleAuthError::BadProjectName(e.clone()) + } +} + // TODO: convert into an enum with "error" #[derive(Serialize, Deserialize, Debug)] struct GetRoleSecretResponse { @@ -94,7 +100,7 @@ impl<'a> Api<'a> { let mut url = self.endpoint.clone(); url.path_segments_mut().push("proxy_get_role_secret"); url.query_pairs_mut() - .append_pair("project", &self.creds.project_name) + .append_pair("project", self.creds.project_name.as_ref()?) .append_pair("role", &self.creds.user); // TODO: use a proper logger @@ -117,8 +123,8 @@ impl<'a> Api<'a> { async fn wake_compute(&self) -> Result { let mut url = self.endpoint.clone(); url.path_segments_mut().push("proxy_wake_compute"); - url.query_pairs_mut() - .append_pair("project", &self.creds.project_name); + let project_name = self.creds.project_name.as_ref()?; + url.query_pairs_mut().append_pair("project", project_name); // TODO: use a proper logger println!("cplane request: {url}"); diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 48dc8542ec..b5312fbe1f 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Error, PartialEq, Eq, Clone)] pub enum ClientCredsParseError { #[error("Parameter `{0}` is missing in startup packet.")] MissingKey(&'static str), @@ -44,7 +44,7 @@ impl UserFacingError for ClientCredsParseError {} pub struct ClientCredentials { pub user: String, pub dbname: String, - pub project_name: String, + pub project_name: Result, } impl ClientCredentials { @@ -67,7 +67,7 @@ impl ClientCredentials { let user = get_param("user")?; let dbname = get_param("database")?; let project_name = get_param("project").ok(); - let project_name = get_project_name(sni_data, common_name, project_name.as_deref())?; + let project_name = get_project_name(sni_data, common_name, project_name.as_deref()); Ok(Self { user, diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 169b106aa9..8e0eb971f3 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -5,6 +5,11 @@ use anyhow::Context; use anyhow::Error; use anyhow::Result; use etcd_broker::subscription_value::SkTimelineInfo; +use etcd_broker::LeaseKeepAliveStream; +use etcd_broker::LeaseKeeper; + +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::time::Duration; use tokio::spawn; use tokio::task::JoinHandle; @@ -21,7 +26,7 @@ use utils::zid::{NodeId, ZTenantTimelineId}; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; -const LEASE_TTL_SEC: i64 = 5; +const LEASE_TTL_SEC: i64 = 10; pub fn thread_main(conf: SafeKeeperConf) { let runtime = runtime::Builder::new_current_thread() @@ -154,13 +159,48 @@ pub fn get_candiate_name(system_id: NodeId) -> String { format!("id_{system_id}") } +async fn push_sk_info( + zttid: ZTenantTimelineId, + mut client: Client, + key: String, + sk_info: SkTimelineInfo, + mut lease: Lease, +) -> anyhow::Result<(ZTenantTimelineId, Lease)> { + let put_opts = PutOptions::new().with_lease(lease.id); + client + .put( + key.clone(), + serde_json::to_string(&sk_info)?, + Some(put_opts), + ) + .await + .with_context(|| format!("failed to push safekeeper info to {}", key))?; + + // revive the lease + lease + .keeper + .keep_alive() + .await + .context("failed to send LeaseKeepAliveRequest")?; + lease + .ka_stream + .message() + .await + .context("failed to receive LeaseKeepAliveResponse")?; + + Ok((zttid, lease)) +} + +struct Lease { + id: i64, + keeper: LeaseKeeper, + ka_stream: LeaseKeepAliveStream, +} + /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let mut client = Client::connect(&conf.broker_endpoints, None).await?; - - // Get and maintain lease to automatically delete obsolete data - let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; - let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?; + let mut leases: HashMap = HashMap::new(); let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); loop { @@ -168,33 +208,46 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // is under plain mutex. That's ok, all this code is not performance // sensitive and there is no risk of deadlock as we don't await while // lock is held. - for zttid in GlobalTimelines::get_active_timelines() { - if let Some(tli) = GlobalTimelines::get_loaded(zttid) { - let sk_info = tli.get_public_info(&conf)?; - let put_opts = PutOptions::new().with_lease(lease.id()); - client - .put( - timeline_safekeeper_path( - conf.broker_etcd_prefix.clone(), - zttid, - conf.my_id, - ), - serde_json::to_string(&sk_info)?, - Some(put_opts), - ) - .await - .context("failed to push safekeeper info")?; + let active_tlis = GlobalTimelines::get_active_timelines(); + + // // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data. + for zttid in active_tlis.iter() { + if let Entry::Vacant(v) = leases.entry(*zttid) { + let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; + let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?; + v.insert(Lease { + id: lease.id(), + keeper, + ka_stream, + }); } } - // revive the lease - keeper - .keep_alive() - .await - .context("failed to send LeaseKeepAliveRequest")?; - ka_stream - .message() - .await - .context("failed to receive LeaseKeepAliveResponse")?; + leases.retain(|zttid, _| active_tlis.contains(zttid)); + + // Push data concurrently to not suffer from latency, with many timelines it can be slow. + let handles = active_tlis + .iter() + .filter_map(|zttid| GlobalTimelines::get_loaded(*zttid)) + .map(|tli| { + let sk_info = tli.get_public_info(&conf); + let key = timeline_safekeeper_path( + conf.broker_etcd_prefix.clone(), + tli.zttid, + conf.my_id, + ); + let lease = leases.remove(&tli.zttid).unwrap(); + tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease)) + }) + .collect::>(); + for h in handles { + let (zttid, lease) = h.await??; + // It is ugly to pull leases from hash and then put it back, but + // otherwise we have to resort to long living per tli tasks (which + // would generate a lot of errors when etcd is down) as task wants to + // have 'static objects, we can't borrow to it. + leases.insert(zttid, lease); + } + sleep(push_interval).await; } } @@ -221,15 +274,12 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { .await .context("failed to subscribe for safekeeper info")?; loop { - match subscription.fetch_data().await { + match subscription.value_updates.recv().await { Some(new_info) => { - for (zttid, sk_info) in new_info { - // note: there are blocking operations below, but it's considered fine for now - if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { - for (safekeeper_id, info) in sk_info { - tli.record_safekeeper_info(&info, safekeeper_id).await? - } - } + // note: there are blocking operations below, but it's considered fine for now + if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) { + tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) + .await? } } None => { diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index e1740cdcbf..5e749796dd 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -239,6 +239,19 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result remote_consistent_lsn: Lsn(0), peers: Peers(vec![]), }); + } else if version == 5 { + info!("reading safekeeper control file version {}", version); + let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?; + if oldstate.timeline_start_lsn != Lsn(0) { + return Ok(oldstate); + } + + // set special timeline_start_lsn because we don't know the real one + info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)"); + oldstate.timeline_start_lsn = Lsn(1); + oldstate.local_start_lsn = Lsn(1); + + return Ok(oldstate); } bail!("unsupported safekeeper control file version {}", version) } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7986fa5834..331baffbca 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -28,7 +28,7 @@ use utils::{ }; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 5; +pub const SK_FORMAT_VERSION: u32 = 6; const SK_PROTOCOL_VERSION: u32 = 2; const UNKNOWN_SERVER_VERSION: u32 = 0; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2e415a53d0..bed6e447d7 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -11,7 +11,7 @@ use serde::Serialize; use tokio::sync::watch; use std::cmp::{max, min}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::{self}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -445,9 +445,9 @@ impl Timeline { } /// Prepare public safekeeper info for reporting. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result { + pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { let shared_state = self.mutex.lock().unwrap(); - Ok(SkTimelineInfo { + SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), // note: this value is not flushed to control file yet and can be lost @@ -460,7 +460,7 @@ impl Timeline { peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), safekeeper_connstr: Some(conf.listen_pg_addr.clone()), backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - }) + } } /// Update timeline state with peer safekeeper data. @@ -625,6 +625,8 @@ impl GlobalTimelines { zttid: ZTenantTimelineId, create: bool, ) -> Result> { + let _enter = info_span!("", timeline = %zttid.tenant_id).entered(); + let mut state = TIMELINES_STATE.lock().unwrap(); match state.timelines.get(&zttid) { @@ -667,7 +669,7 @@ impl GlobalTimelines { } /// Get ZTenantTimelineIDs of all active timelines. - pub fn get_active_timelines() -> Vec { + pub fn get_active_timelines() -> HashSet { let state = TIMELINES_STATE.lock().unwrap(); state .timelines diff --git a/test_runner/README.md b/test_runner/README.md index f95588462b..4b54c45175 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -45,7 +45,7 @@ If you want to run all tests that have the string "bench" in their names: Useful environment variables: -`ZENITH_BIN`: The directory where zenith binaries can be found. +`NEON_BIN`: The directory where neon binaries can be found. `POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found. `TEST_OUTPUT`: Set the directory where test state and test output files should go. diff --git a/test_runner/batch_others/test_createdropdb.py b/test_runner/batch_others/test_createdropdb.py index 392e5a6fd4..151ce997ee 100644 --- a/test_runner/batch_others/test_createdropdb.py +++ b/test_runner/batch_others/test_createdropdb.py @@ -35,9 +35,14 @@ def test_createdb(neon_simple_env: NeonEnv): with closing(db.connect(dbname='foodb')) as conn: with conn.cursor() as cur: # Check database size in both branches - cur.execute( - 'select pg_size_pretty(pg_database_size(%s)), pg_size_pretty(sum(pg_relation_size(oid))) from pg_class where relisshared is false;', - ('foodb', )) + cur.execute(""" + select pg_size_pretty(pg_database_size('foodb')), + pg_size_pretty( + sum(pg_relation_size(oid, 'main')) + +sum(pg_relation_size(oid, 'vm')) + +sum(pg_relation_size(oid, 'fsm')) + ) FROM pg_class where relisshared is false + """) res = cur.fetchone() # check that dbsize equals sum of all relation sizes, excluding shared ones # This is how we define dbsize in neon for now diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index 8a2748b880..b0ba8758cc 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -1,5 +1,5 @@ # It's possible to run any regular test with the local fs remote storage via -# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" poetry ...... +# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import shutil, os from contextlib import closing diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index 70dbe8ac4a..5734091757 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -8,7 +8,6 @@ import time def test_timeline_size(neon_simple_env: NeonEnv): env = neon_simple_env - # Branch at the point where only 100 rows were inserted new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty') client = env.pageserver.http_client() @@ -23,7 +22,6 @@ def test_timeline_size(neon_simple_env: NeonEnv): with conn.cursor() as cur: cur.execute("SHOW neon.timeline_id") - # Create table, and insert the first 100 rows cur.execute("CREATE TABLE foo (t text)") cur.execute(""" INSERT INTO foo @@ -43,6 +41,51 @@ def test_timeline_size(neon_simple_env: NeonEnv): "current_logical_size_non_incremental"] +def test_timeline_size_createdropdb(neon_simple_env: NeonEnv): + env = neon_simple_env + new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty') + + client = env.pageserver.http_client() + timeline_details = assert_local(client, env.initial_tenant, new_timeline_id) + assert timeline_details['local']['current_logical_size'] == timeline_details['local'][ + 'current_logical_size_non_incremental'] + + pgmain = env.postgres.create_start("test_timeline_size") + log.info("postgres is running on 'test_timeline_size' branch") + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + cur.execute("SHOW neon.timeline_id") + + res = assert_local(client, env.initial_tenant, new_timeline_id) + local_details = res['local'] + assert local_details["current_logical_size"] == local_details[ + "current_logical_size_non_incremental"] + + cur.execute('CREATE DATABASE foodb') + with closing(pgmain.connect(dbname='foodb')) as conn: + with conn.cursor() as cur2: + + cur2.execute("CREATE TABLE foo (t text)") + cur2.execute(""" + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 10) g + """) + + res = assert_local(client, env.initial_tenant, new_timeline_id) + local_details = res['local'] + assert local_details["current_logical_size"] == local_details[ + "current_logical_size_non_incremental"] + + cur.execute('DROP DATABASE foodb') + + res = assert_local(client, env.initial_tenant, new_timeline_id) + local_details = res['local'] + assert local_details["current_logical_size"] == local_details[ + "current_logical_size_non_incremental"] + + # wait until received_lsn_lag is 0 def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60): started_at = time.time() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 12edcb8792..7506641fcb 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -50,7 +50,7 @@ A fixture is created with the decorator @pytest.fixture decorator. See docs: https://docs.pytest.org/en/6.2.x/fixture.html There are several environment variables that can control the running of tests: -ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information. +NEON_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information. There's no need to import this file to use it. It should be declared as a plugin inside conftest.py, and that makes it available to all tests. @@ -151,7 +151,7 @@ def pytest_configure(config): return # Find the neon binaries. global neon_binpath - env_neon_bin = os.environ.get('ZENITH_BIN') + env_neon_bin = os.environ.get('NEON_BIN') if env_neon_bin: neon_binpath = env_neon_bin else: diff --git a/test_runner/performance/test_wal_backpressure.py b/test_runner/performance/test_wal_backpressure.py index 873d1132a7..862b5e1c5e 100644 --- a/test_runner/performance/test_wal_backpressure.py +++ b/test_runner/performance/test_wal_backpressure.py @@ -80,6 +80,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it thread.join() +@pytest.mark.timeout(1000) @pytest.mark.parametrize("n_tables", [5]) @pytest.mark.parametrize("scale", get_scales_matrix(5)) @pytest.mark.parametrize("num_iters", [10]) @@ -121,6 +122,7 @@ def start_pgbench_simple_update_workload(env: PgCompare, duration: int): env.flush() +@pytest.mark.timeout(1000) @pytest.mark.parametrize("scale", get_scales_matrix(100)) @pytest.mark.parametrize("duration", get_durations_matrix()) def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, duration: int): @@ -158,6 +160,7 @@ def start_pgbench_intensive_initialization(env: PgCompare, scale: int): ]) +@pytest.mark.timeout(1000) @pytest.mark.parametrize("scale", get_scales_matrix(1000)) def test_pgbench_intensive_init_workload(pg_compare: PgCompare, scale: int): env = pg_compare