diff --git a/.github/actions/allure-report-generate/action.yml b/.github/actions/allure-report-generate/action.yml index 07120c4c8a..7f7fa9e7a1 100644 --- a/.github/actions/allure-report-generate/action.yml +++ b/.github/actions/allure-report-generate/action.yml @@ -147,6 +147,8 @@ runs: echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT + echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY} + - name: Release lock if: always() shell: bash -euxo pipefail {0} diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index 1dd294d17a..bb120e9470 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -48,6 +48,10 @@ inputs: description: 'Whether to rerun flaky tests' required: false default: 'false' + pg_version: + description: 'Postgres version to use for tests' + required: false + default: 'v14' runs: using: "composite" @@ -68,7 +72,7 @@ runs: prefix: latest - name: Download compatibility snapshot for Postgres 14 - if: inputs.build_type != 'remote' + if: inputs.build_type != 'remote' && inputs.pg_version == 'v14' uses: ./.github/actions/download with: name: compatibility-snapshot-${{ inputs.build_type }}-pg14 @@ -106,13 +110,14 @@ runs: ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage') ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage') RERUN_FLAKY: ${{ inputs.rerun_flaky }} + PG_VERSION: ${{ inputs.pg_version }} shell: bash -euxo pipefail {0} run: | # PLATFORM will be embedded in the perf test report # and it is needed to distinguish different environments export PLATFORM=${PLATFORM:-github-actions-selfhosted} export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install} - export DEFAULT_PG_VERSION=${DEFAULT_PG_VERSION:-14} + export DEFAULT_PG_VERSION=${PG_VERSION#v} if [ "${BUILD_TYPE}" = "remote" ]; then export REMOTE_ENV=1 @@ -193,7 +198,7 @@ runs: fi - name: Upload compatibility snapshot for Postgres 14 - if: github.ref_name == 'release' + if: github.ref_name == 'release' && inputs.pg_version == 'v14' uses: ./.github/actions/upload with: name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }} @@ -206,4 +211,4 @@ runs: uses: ./.github/actions/allure-report-store with: report-dir: /tmp/test_output/allure/results - unique-key: ${{ inputs.test_selection }}-${{ inputs.build_type }} + unique-key: ${{ inputs.build_type }} diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 816c5ee711..22c025dd89 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,6 +1,6 @@ -## Describe your changes +## Problem -## Issue ticket number and link +## Summary of changes ## Checklist before requesting a review diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 11363b2407..08b74a2656 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -16,12 +16,12 @@ on: workflow_dispatch: # adds ability to run this manually inputs: region_id: - description: 'Use a particular region. If not set the default region will be used' + description: 'Project region id. If not set, the default region will be used' required: false default: 'aws-us-east-2' save_perf_report: type: boolean - description: 'Publish perf report or not. If not set, the report is published only for the main branch' + description: 'Publish perf report. If not set, the report will be published only for the main branch' required: false defaults: @@ -125,13 +125,14 @@ jobs: matrix='{ "platform": [ "neon-captest-new", - "neon-captest-reuse" + "neon-captest-reuse", + "neonvm-captest-new" ], "db_size": [ "10gb" ], - "include": [ - { "platform": "neon-captest-freetier", "db_size": "3gb" }, - { "platform": "neon-captest-new", "db_size": "50gb" } - ] + "include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" }, + { "platform": "neon-captest-new", "db_size": "50gb" }, + { "platform": "neonvm-captest-freetier", "db_size": "3gb" }, + { "platform": "neonvm-captest-new", "db_size": "50gb" }] }' if [ "$(date +%A)" = "Saturday" ]; then @@ -197,7 +198,7 @@ jobs: echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH - name: Create Neon Project - if: contains(fromJson('["neon-captest-new", "neon-captest-freetier"]'), matrix.platform) + if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier"]'), matrix.platform) id: create-neon-project uses: ./.github/actions/neon-project-create with: @@ -205,6 +206,7 @@ jobs: postgres_version: ${{ env.DEFAULT_PG_VERSION }} api_key: ${{ secrets.NEON_STAGING_API_KEY }} compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }} + provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }} - name: Set up Connection String id: set-up-connstr @@ -213,7 +215,7 @@ jobs: neon-captest-reuse) CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }} ;; - neon-captest-new | neon-captest-freetier) + neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier) CONNSTR=${{ steps.create-neon-project.outputs.dsn }} ;; rds-aurora) @@ -223,7 +225,7 @@ jobs: CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }} ;; *) - echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'" + echo >&2 "Unknown PLATFORM=${PLATFORM}" exit 1 ;; esac diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5a09f0b4aa..9114e02622 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -324,7 +324,8 @@ jobs: runs-on: [ self-hosted, gen3, large ] container: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned - options: --init + # Default shared memory is 64mb + options: --init --shm-size=512mb needs: [ build-neon ] strategy: fail-fast: false @@ -350,8 +351,8 @@ jobs: real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}" real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}" rerun_flaky: true + pg_version: ${{ matrix.pg_version }} env: - DEFAULT_PG_VERSION: ${{ matrix.pg_version }} TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }} CHECK_ONDISK_DATA_COMPATIBILITY: nonempty @@ -363,7 +364,8 @@ jobs: runs-on: [ self-hosted, gen3, small ] container: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned - options: --init + # Default shared memory is 64mb + options: --init --shm-size=512mb needs: [ build-neon ] if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks') strategy: @@ -490,37 +492,43 @@ jobs: - name: Merge coverage data run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge - - name: Build and upload coverage report + - name: Build coverage report + env: + COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }} run: | - COMMIT_SHA=${{ github.event.pull_request.head.sha }} - COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}} - COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA - scripts/coverage \ --dir=/tmp/coverage report \ --input-objects=/tmp/coverage/binaries.list \ - --commit-url=$COMMIT_URL \ + --commit-url=${COMMIT_URL} \ --format=github - REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA + - name: Upload coverage report + id: upload-coverage-report + env: + BUCKET: neon-github-public-dev + COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }} + run: | + aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA} - scripts/git-upload \ - --repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \ - --message="Add code coverage for $COMMIT_URL" \ - copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE + REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html + echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT - # Add link to the coverage report to the commit - curl -f -X POST \ - https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \ - -H "Accept: application/vnd.github.v3+json" \ - --user "${{ secrets.CI_ACCESS_TOKEN }}" \ - --data \ - "{ - \"state\": \"success\", - \"context\": \"neon-coverage\", - \"description\": \"Coverage report is ready\", - \"target_url\": \"$REPORT_URL\" - }" + - uses: actions/github-script@v6 + env: + REPORT_URL: ${{ steps.upload-coverage-report.outputs.report-url }} + COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }} + with: + script: | + const { REPORT_URL, COMMIT_SHA } = process.env + + await github.rest.repos.createCommitStatus({ + owner: context.repo.owner, + repo: context.repo.repo, + sha: `${COMMIT_SHA}`, + state: 'success', + target_url: `${REPORT_URL}`, + context: 'Code coverage report', + }) trigger-e2e-tests: runs-on: [ self-hosted, gen3, small ] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 43ebefc477..c5b3ff7459 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,7 +2,7 @@ Howdy! Usual good software engineering practices apply. Write tests. Write comments. Follow standard Rust coding practices where -possible. Use 'cargo fmt' and 'clippy' to tidy up formatting. +possible. Use `cargo fmt` and `cargo clippy` to tidy up formatting. There are soft spots in the code, which could use cleanup, refactoring, additional comments, and so forth. Let's try to raise the diff --git a/Cargo.lock b/Cargo.lock index 167acf0e69..55418473d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,40 +230,38 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56a636c44c77fa18bdba56126a34d30cfe5538fe88f7d34988fa731fee143ddd" +checksum = "fc00553f5f3c06ffd4510a9d576f92143618706c45ea6ff81e84ad9be9588abd" dependencies = [ + "aws-credential-types", "aws-http", - "aws-sdk-sso", "aws-sdk-sts", - "aws-smithy-async 0.51.0", - "aws-smithy-client 0.51.0", - "aws-smithy-http 0.51.0", - "aws-smithy-http-tower 0.51.0", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", "aws-smithy-json", - "aws-smithy-types 0.51.0", - "aws-types 0.51.0", + "aws-smithy-types", + "aws-types", "bytes", - "hex", + "fastrand", "http", "hyper", - "ring", "time", "tokio", "tower", "tracing", - "zeroize", ] [[package]] name = "aws-credential-types" -version = "0.55.1" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4232d3729eefc287adc0d5a8adc97b7d94eefffe6bbe94312cc86c7ab6b06ce" +checksum = "4cb57ac6088805821f78d282c0ba8aec809f11cbee10dda19a97b03ab040ccc2" dependencies = [ - "aws-smithy-async 0.55.1", - "aws-smithy-types 0.55.1", + "aws-smithy-async", + "aws-smithy-types", "fastrand", "tokio", "tracing", @@ -272,13 +270,13 @@ dependencies = [ [[package]] name = "aws-endpoint" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ca8f374874f6459aaa88dc861d7f5d834ca1ff97668eae190e97266b5f6c3fb" +checksum = "9c5f6f84a4f46f95a9bb71d9300b73cd67eb868bc43ae84f66ad34752299f4ac" dependencies = [ - "aws-smithy-http 0.51.0", - "aws-smithy-types 0.51.0", - "aws-types 0.51.0", + "aws-smithy-http", + "aws-smithy-types", + "aws-types", "http", "regex", "tracing", @@ -286,13 +284,14 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78d41e19e779b73463f5f0c21b3aacc995f4ba783ab13a7ae9f5dfb159a551b4" +checksum = "a754683c322f7dc5167484266489fdebdcd04d26e53c162cad1f3f949f2c5671" dependencies = [ - "aws-smithy-http 0.51.0", - "aws-smithy-types 0.51.0", - "aws-types 0.51.0", + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-types", + "aws-types", "bytes", "http", "http-body", @@ -304,127 +303,104 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "0.21.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9f08665c8e03aca8cb092ef01e617436ebfa977fddc1240e1b062488ab5d48a" +checksum = "392b9811ca489747ac84349790e49deaa1f16631949e7dd4156000251c260eae" dependencies = [ + "aws-credential-types", "aws-endpoint", "aws-http", "aws-sig-auth", "aws-sigv4", - "aws-smithy-async 0.51.0", + "aws-smithy-async", "aws-smithy-checksums", - "aws-smithy-client 0.51.0", + "aws-smithy-client", "aws-smithy-eventstream", - "aws-smithy-http 0.51.0", - "aws-smithy-http-tower 0.51.0", - "aws-smithy-types 0.51.0", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", "aws-smithy-xml", - "aws-types 0.51.0", + "aws-types", "bytes", - "bytes-utils", "http", "http-body", + "once_cell", + "percent-encoding", + "regex", "tokio-stream", "tower", "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d0fbe3c2c342bc8dfea4bb43937405a8ec06f99140a0dcb9c7b59e54dfa93a1" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "regex", + "tower", + "tracing", ] -[[package]] -name = "aws-sdk-sso" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86dcb1cb71aa8763b327542ead410424515cff0cde5b753eedd2917e09c63734" -dependencies = [ - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async 0.51.0", - "aws-smithy-client 0.51.0", - "aws-smithy-http 0.51.0", - "aws-smithy-http-tower 0.51.0", - "aws-smithy-json", - "aws-smithy-types 0.51.0", - "aws-types 0.51.0", - "bytes", - "http", - "tokio-stream", - "tower", -] - -[[package]] -name = "aws-sdk-sts" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdfcf584297c666f6b472d5368a78de3bc714b6e0a53d7fbf76c3e347c292ab1" -dependencies = [ - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async 0.51.0", - "aws-smithy-client 0.51.0", - "aws-smithy-http 0.51.0", - "aws-smithy-http-tower 0.51.0", - "aws-smithy-query", - "aws-smithy-types 0.51.0", - "aws-smithy-xml", - "aws-types 0.51.0", - "bytes", - "http", - "tower", -] - [[package]] name = "aws-sig-auth" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cbe7b2be9e185c1fbce27fc9c41c66b195b32d89aa099f98768d9544221308" +checksum = "84dc92a63ede3c2cbe43529cb87ffa58763520c96c6a46ca1ced80417afba845" dependencies = [ + "aws-credential-types", "aws-sigv4", "aws-smithy-eventstream", - "aws-smithy-http 0.51.0", - "aws-types 0.51.0", + "aws-smithy-http", + "aws-types", "http", "tracing", ] [[package]] name = "aws-sigv4" -version = "0.51.1" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c0b2658d2cb66dbf02f0e8dee80810ef1e0ca3530ede463e0ef994c301087d1" +checksum = "392fefab9d6fcbd76d518eb3b1c040b84728ab50f58df0c3c53ada4bea9d327e" dependencies = [ "aws-smithy-eventstream", - "aws-smithy-http 0.51.0", + "aws-smithy-http", "bytes", "form_urlencoded", "hex", + "hmac", "http", "once_cell", "percent-encoding", "regex", - "ring", + "sha2", "time", "tracing", ] [[package]] name = "aws-smithy-async" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b3442b4c5d3fc39891a2e5e625735fba6b24694887d49c6518460fde98247a9" -dependencies = [ - "futures-util", - "pin-project-lite", - "tokio", - "tokio-stream", -] - -[[package]] -name = "aws-smithy-async" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88573bcfbe1dcfd54d4912846df028b42d6255cbf9ce07be216b1bbfd11fc4b9" +checksum = "ae23b9fe7a07d0919000116c4c5c0578303fbce6fc8d32efca1f7759d4c20faf" dependencies = [ "futures-util", "pin-project-lite", @@ -434,12 +410,12 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc227e36e346f45298288359f37123e1a92628d1cec6b11b5eb335553278bd9e" +checksum = "a6367acbd6849b8c7c659e166955531274ae147bf83ab4312885991f6b6706cb" dependencies = [ - "aws-smithy-http 0.51.0", - "aws-smithy-types 0.51.0", + "aws-smithy-http", + "aws-smithy-types", "bytes", "crc32c", "crc32fast", @@ -455,14 +431,14 @@ dependencies = [ [[package]] name = "aws-smithy-client" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff28d553714f8f54cd921227934fc13a536a1c03f106e56b362fd57e16d450ad" +checksum = "5230d25d244a51339273b8870f0f77874cd4449fb4f8f629b21188ae10cfc0ba" dependencies = [ - "aws-smithy-async 0.51.0", - "aws-smithy-http 0.51.0", - "aws-smithy-http-tower 0.51.0", - "aws-smithy-types 0.51.0", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", "bytes", "fastrand", "http", @@ -471,26 +447,7 @@ dependencies = [ "hyper-rustls", "lazy_static", "pin-project-lite", - "tokio", - "tower", - "tracing", -] - -[[package]] -name = "aws-smithy-client" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2f52352bae50d3337d5d6151b695d31a8c10ebea113eca5bead531f8301b067" -dependencies = [ - "aws-smithy-async 0.55.1", - "aws-smithy-http 0.55.1", - "aws-smithy-http-tower 0.55.1", - "aws-smithy-types 0.55.1", - "bytes", - "fastrand", - "http", - "http-body", - "pin-project-lite", + "rustls 0.20.8", "tokio", "tower", "tracing", @@ -498,23 +455,23 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7ea0df7161ce65b5c8ca6eb709a1a907376fa18226976e41c748ce02ccccf24" +checksum = "22d2a2bcc16e5c4d949ffd2b851da852b9bbed4bb364ed4ae371b42137ca06d9" dependencies = [ - "aws-smithy-types 0.51.0", + "aws-smithy-types", "bytes", "crc32fast", ] [[package]] name = "aws-smithy-http" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf58ed4fefa61dbf038e5421a521cbc2c448ef69deff0ab1d915d8a10eda5664" +checksum = "b60e2133beb9fe6ffe0b70deca57aaeff0a35ad24a9c6fab2fd3b4f45b99fdb5" dependencies = [ "aws-smithy-eventstream", - "aws-smithy-types 0.51.0", + "aws-smithy-types", "bytes", "bytes-utils", "futures-core", @@ -530,49 +487,14 @@ dependencies = [ "tracing", ] -[[package]] -name = "aws-smithy-http" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03bcc02d7ed9649d855c8ce4a735e9848d7b8f7568aad0504c158e3baa955df8" -dependencies = [ - "aws-smithy-types 0.55.1", - "bytes", - "bytes-utils", - "futures-core", - "http", - "http-body", - "hyper", - "once_cell", - "percent-encoding", - "pin-project-lite", - "pin-utils", - "tracing", -] - [[package]] name = "aws-smithy-http-tower" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20c96d7bd35e7cf96aca1134b2f81b1b59ffe493f7c6539c051791cbbf7a42d3" +checksum = "3a4d94f556c86a0dd916a5d7c39747157ea8cb909ca469703e20fee33e448b67" dependencies = [ - "aws-smithy-http 0.51.0", - "bytes", - "http", - "http-body", - "pin-project-lite", - "tower", - "tracing", -] - -[[package]] -name = "aws-smithy-http-tower" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da88b3a860f65505996c29192d800f1aeb9480440f56d63aad33a3c12045017a" -dependencies = [ - "aws-smithy-http 0.55.1", - "aws-smithy-types 0.55.1", + "aws-smithy-http", + "aws-smithy-types", "bytes", "http", "http-body", @@ -583,40 +505,28 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8324ba98c8a94187723cc16c37aefa09504646ee65c3d2c3af495bab5ea701b" +checksum = "5ce3d6e6ebb00b2cce379f079ad5ec508f9bcc3a9510d9b9c1840ed1d6f8af39" dependencies = [ - "aws-smithy-types 0.51.0", + "aws-smithy-types", ] [[package]] name = "aws-smithy-query" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83834ed2ff69ea6f6657baf205267dc2c0abe940703503a3e5d60ce23be3d306" +checksum = "d58edfca32ef9bfbc1ca394599e17ea329cb52d6a07359827be74235b64b3298" dependencies = [ - "aws-smithy-types 0.51.0", + "aws-smithy-types", "urlencoding", ] [[package]] name = "aws-smithy-types" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b02e06ea63498c43bc0217ea4d16605d4e58d85c12fc23f6572ff6d0a840c61" -dependencies = [ - "itoa", - "num-integer", - "ryu", - "time", -] - -[[package]] -name = "aws-smithy-types" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0afc731fd1417d791f9145a1e0c30e23ae0beaab9b4814017708ead2fc20f1" +checksum = "58db46fc1f4f26be01ebdb821751b4e2482cd43aa2b64a0348fb89762defaffa" dependencies = [ "base64-simd", "itoa", @@ -627,40 +537,24 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "246e9f83dd1fdf5d347fa30ae4ad30a9d1d42ce4cd74a93d94afa874646f94cd" +checksum = "fb557fe4995bd9ec87fb244bbb254666a971dc902a783e9da8b7711610e9664c" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "0.51.0" +version = "0.55.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05701d32da168b44f7ee63147781aed8723e792cc131cb9b18363b5393f17f70" -dependencies = [ - "aws-smithy-async 0.51.0", - "aws-smithy-client 0.51.0", - "aws-smithy-http 0.51.0", - "aws-smithy-types 0.51.0", - "http", - "rustc_version", - "tracing", - "zeroize", -] - -[[package]] -name = "aws-types" -version = "0.55.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b082e329d9a304d39e193ad5c7ab363a0d6507aca6965e0673a746686fb0cc" +checksum = "de0869598bfe46ec44ffe17e063ed33336e59df90356ca8ff0e8da6f7c1d994b" dependencies = [ "aws-credential-types", - "aws-smithy-async 0.55.1", - "aws-smithy-client 0.55.1", - "aws-smithy-http 0.55.1", - "aws-smithy-types 0.55.1", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-types", "http", "rustc_version", "tracing", @@ -3367,9 +3261,10 @@ dependencies = [ "anyhow", "async-trait", "aws-config", + "aws-credential-types", "aws-sdk-s3", - "aws-smithy-http 0.51.0", - "aws-types 0.55.1", + "aws-smithy-http", + "aws-types", "hyper", "metrics", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index b73e29ef6c..c901532f86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,10 @@ anyhow = { version = "1.0", features = ["backtrace"] } async-stream = "0.3" async-trait = "0.1" atty = "0.2.14" -aws-config = { version = "0.51.0", default-features = false, features=["rustls"] } -aws-sdk-s3 = "0.21.0" -aws-smithy-http = "0.51.0" +aws-config = { version = "0.55", default-features = false, features=["rustls"] } +aws-sdk-s3 = "0.25" +aws-smithy-http = "0.55" +aws-credential-types = "0.55" aws-types = "0.55" base64 = "0.13.0" bincode = "1.3" diff --git a/README.md b/README.md index ce6ec09d24..1b0164d5c0 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The Neon storage engine consists of two major components: - Pageserver. Scalable storage backend for the compute nodes. - Safekeepers. The safekeepers form a redundant WAL service that received WAL from the compute node, and stores it durably until it has been processed by the pageserver and uploaded to cloud storage. -See developer documentation in [/docs/SUMMARY.md](/docs/SUMMARY.md) for more information. +See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more information. ## Running local installation @@ -238,9 +238,9 @@ CARGO_BUILD_FLAGS="--features=testing" make ## Documentation -[/docs/](/docs/) Contains a top-level overview of all available markdown documentation. +[docs](/docs) Contains a top-level overview of all available markdown documentation. -- [/docs/sourcetree.md](/docs/sourcetree.md) contains overview of source tree layout. +- [sourcetree.md](/docs/sourcetree.md) contains overview of source tree layout. To view your `rustdoc` documentation in a browser, try running `cargo doc --no-deps --open` @@ -265,6 +265,6 @@ To get more familiar with this aspect, refer to: ## Join the development -- Read `CONTRIBUTING.md` to learn about project code style and practices. -- To get familiar with a source tree layout, use [/docs/sourcetree.md](/docs/sourcetree.md). +- Read [CONTRIBUTING.md](/CONTRIBUTING.md) to learn about project code style and practices. +- To get familiar with a source tree layout, use [sourcetree.md](/docs/sourcetree.md). - To learn more about PostgreSQL internals, check http://www.interdb.jp/pg/index.html diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 75991045a4..f022be3910 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -8,9 +8,7 @@ use std::process::{Child, Command}; use std::{io, result}; use anyhow::{bail, Context}; -use pageserver_api::models::{ - TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, -}; +use pageserver_api::models::{self, TenantInfo, TimelineInfo}; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -316,8 +314,8 @@ impl PageServerNode { settings: HashMap<&str, &str>, ) -> anyhow::Result { let mut settings = settings.clone(); - let request = TenantCreateRequest { - new_tenant_id, + + let config = models::TenantConfig { checkpoint_distance: settings .remove("checkpoint_distance") .map(|x| x.parse::()) @@ -372,6 +370,10 @@ impl PageServerNode { .remove("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), }; + let request = models::TenantCreateRequest { + new_tenant_id, + config, + }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") } @@ -392,9 +394,9 @@ impl PageServerNode { } pub fn tenant_config(&self, tenant_id: TenantId, settings: HashMap<&str, &str>) -> Result<()> { - self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))? - .json(&TenantConfigRequest { - tenant_id, + let config = { + // Braces to make the diff easier to read + models::TenantConfig { checkpoint_distance: settings .get("checkpoint_distance") .map(|x| x.parse::()) @@ -451,7 +453,11 @@ impl PageServerNode { evictions_low_residence_duration_metric_threshold: settings .get("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), - }) + } + }; + + self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))? + .json(&models::TenantConfigRequest { tenant_id, config }) .send()? .error_from_body()?; @@ -483,7 +489,7 @@ impl PageServerNode { Method::POST, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), )? - .json(&TimelineCreateRequest { + .json(&models::TimelineCreateRequest { new_timeline_id, ancestor_start_lsn, ancestor_timeline_id, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e4df81c9ad..0bcdb3c3a8 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -136,6 +136,20 @@ pub struct TenantCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub new_tenant_id: Option, + #[serde(flatten)] + pub config: TenantConfig, +} + +impl std::ops::Deref for TenantCreateRequest { + type Target = TenantConfig; + + fn deref(&self) -> &Self::Target { + &self.config + } +} + +#[derive(Serialize, Deserialize, Default)] +pub struct TenantConfig { pub checkpoint_distance: Option, pub checkpoint_timeout: Option, pub compaction_target_size: Option, @@ -182,33 +196,21 @@ impl TenantCreateRequest { pub struct TenantConfigRequest { #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde(default)] - pub checkpoint_distance: Option, - pub checkpoint_timeout: Option, - pub compaction_target_size: Option, - pub compaction_period: Option, - pub compaction_threshold: Option, - pub gc_horizon: Option, - pub gc_period: Option, - pub image_creation_threshold: Option, - pub pitr_interval: Option, - pub walreceiver_connect_timeout: Option, - pub lagging_wal_timeout: Option, - pub max_lsn_wal_lag: Option, - pub trace_read_requests: Option, - // We defer the parsing of the eviction_policy field to the request handler. - // Otherwise we'd have to move the types for eviction policy into this package. - // We might do that once the eviction feature has stabilizied. - // For now, this field is not even documented in the openapi_spec.yml. - pub eviction_policy: Option, - pub min_resident_size_override: Option, - pub evictions_low_residence_duration_metric_threshold: Option, + #[serde(flatten)] + pub config: TenantConfig, +} + +impl std::ops::Deref for TenantConfigRequest { + type Target = TenantConfig; + + fn deref(&self) -> &Self::Target { + &self.config + } } impl TenantConfigRequest { pub fn new(tenant_id: TenantId) -> TenantConfigRequest { - TenantConfigRequest { - tenant_id, + let config = TenantConfig { checkpoint_distance: None, checkpoint_timeout: None, compaction_target_size: None, @@ -225,7 +227,8 @@ impl TenantConfigRequest { eviction_policy: None, min_resident_size_override: None, evictions_low_residence_duration_metric_threshold: None, - } + }; + TenantConfigRequest { tenant_id, config } } } diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 6bc89ed37e..9c39b46cc1 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -146,6 +146,10 @@ pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8; pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; +// From replication/message.h +pub const XLOG_LOGICAL_MESSAGE: u8 = 0x00; + +// From rmgrlist.h pub const RM_XLOG_ID: u8 = 0; pub const RM_XACT_ID: u8 = 1; pub const RM_SMGR_ID: u8 = 2; @@ -157,6 +161,7 @@ pub const RM_RELMAP_ID: u8 = 7; pub const RM_STANDBY_ID: u8 = 8; pub const RM_HEAP2_ID: u8 = 9; pub const RM_HEAP_ID: u8 = 10; +pub const RM_LOGICALMSG_ID: u8 = 21; // from xlogreader.h pub const XLR_INFO_MASK: u8 = 0x0F; diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index da15823b69..0877a38dd9 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -12,6 +12,7 @@ aws-smithy-http.workspace = true aws-types.workspace = true aws-config.workspace = true aws-sdk-s3.workspace = true +aws-credential-types.workspace = true hyper = { workspace = true, features = ["stream"] } serde.workspace = true serde_json.workspace = true diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e6c1e19ad5..0be8c72fe0 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -9,14 +9,15 @@ use std::sync::Arc; use anyhow::Context; use aws_config::{ environment::credentials::EnvironmentVariableCredentialsProvider, - imds::credentials::ImdsCredentialsProvider, - meta::credentials::{CredentialsProviderChain, LazyCachingCredentialsProvider}, + imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain, }; +use aws_credential_types::cache::CredentialsCache; use aws_sdk_s3::{ - config::Config, - error::{GetObjectError, GetObjectErrorKind}, - types::{ByteStream, SdkError}, - Client, Endpoint, Region, + config::{Config, Region}, + error::SdkError, + operation::get_object::GetObjectError, + primitives::ByteStream, + Client, }; use aws_smithy_http::body::SdkBody; use hyper::Body; @@ -125,28 +126,23 @@ impl S3Bucket { let credentials_provider = { // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" - let env_creds = EnvironmentVariableCredentialsProvider::new(); + CredentialsProviderChain::first_try( + "env", + EnvironmentVariableCredentialsProvider::new(), + ) // uses imds v2 - let imds = ImdsCredentialsProvider::builder().build(); - - // finally add caching. - // this might change in future, see https://github.com/awslabs/aws-sdk-rust/issues/629 - LazyCachingCredentialsProvider::builder() - .load(CredentialsProviderChain::first_try("env", env_creds).or_else("imds", imds)) - .build() + .or_else("imds", ImdsCredentialsProvider::builder().build()) }; let mut config_builder = Config::builder() .region(Region::new(aws_config.bucket_region.clone())) + .credentials_cache(CredentialsCache::lazy()) .credentials_provider(credentials_provider); if let Some(custom_endpoint) = aws_config.endpoint.clone() { - let endpoint = Endpoint::immutable( - custom_endpoint - .parse() - .expect("Failed to parse S3 custom endpoint"), - ); - config_builder.set_endpoint_resolver(Some(Arc::new(endpoint))); + config_builder = config_builder + .endpoint_url(custom_endpoint) + .force_path_style(true); } let client = Client::from_conf(config_builder.build()); @@ -229,14 +225,9 @@ impl S3Bucket { ))), }) } - Err(SdkError::ServiceError { - err: - GetObjectError { - kind: GetObjectErrorKind::NoSuchKey(..), - .. - }, - .. - }) => Err(DownloadError::NotFound), + Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { + Err(DownloadError::NotFound) + } Err(e) => { metrics::inc_get_object_fail(); Err(DownloadError::Other(anyhow::anyhow!( diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 9d1c252f00..8fd1d55501 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -5,7 +5,7 @@ //! use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; -use crate::tenant::mgr; +use crate::tenant::{mgr, LogicalSizeCalculationCause}; use anyhow; use chrono::Utc; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; @@ -335,7 +335,9 @@ pub async fn calculate_synthetic_size_worker( if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { - if let Err(e) = tenant.calculate_synthetic_size(ctx).await { + if let Err(e) = tenant.calculate_synthetic_size( + LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize, + ctx).await { error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e); } } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 330587310f..62664733ea 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -747,7 +747,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/TenantCreateInfo" + $ref: "#/components/schemas/TenantCreateRequest" responses: "201": description: New tenant created successfully @@ -794,7 +794,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/TenantConfigInfo" + $ref: "#/components/schemas/TenantConfigRequest" responses: "200": description: OK @@ -846,7 +846,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/TenantConfig" + $ref: "#/components/schemas/TenantConfigResponse" "400": description: Malformed get tenanant config request content: @@ -909,35 +909,27 @@ components: See the tenant `/attach` endpoint for more information. type: string enum: [ "maybe", "attached" ] - TenantCreateInfo: + TenantCreateRequest: + allOf: + - $ref: '#/components/schemas/TenantConfig' + - type: object + properties: + new_tenant_id: + type: string + format: hex + TenantConfigRequest: + allOf: + - $ref: '#/components/schemas/TenantConfig' + - type: object + required: + - tenant_id + properties: + tenant_id: + type: string + format: hex + TenantConfig: type: object properties: - new_tenant_id: - type: string - format: hex - tenant_id: - type: string - format: hex - gc_period: - type: string - gc_horizon: - type: integer - pitr_interval: - type: string - checkpoint_distance: - type: integer - checkpoint_timeout: - type: string - compaction_period: - type: string - compaction_threshold: - type: string - TenantConfigInfo: - type: object - properties: - tenant_id: - type: string - format: hex gc_period: type: string gc_horizon: @@ -964,13 +956,13 @@ components: type: integer trace_read_requests: type: boolean - TenantConfig: + TenantConfigResponse: type: object properties: tenant_specific_overrides: - $ref: "#/components/schemas/TenantConfigInfo" + $ref: "#/components/schemas/TenantConfig" effective_config: - $ref: "#/components/schemas/TenantConfigInfo" + $ref: "#/components/schemas/TenantConfig" TimelineInfo: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index bdea0d256f..e3a6471f53 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -19,13 +19,14 @@ use super::models::{ }; use crate::context::{DownloadBehavior, RequestContext}; use crate::disk_usage_eviction_task; +use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::{TenantMapInsertError, TenantStateError}; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; use utils::{ auth::JwtAuth, @@ -105,6 +106,9 @@ impl From for ApiError { PageReconstructError::Cancelled => { ApiError::InternalServerError(anyhow::anyhow!("request was cancelled")) } + PageReconstructError::AncestorStopping(_) => { + ApiError::InternalServerError(anyhow::Error::new(pre)) + } PageReconstructError::WalRedo(pre) => { ApiError::InternalServerError(anyhow::Error::new(pre)) } @@ -396,9 +400,17 @@ async fn tenant_attach_handler(request: Request) -> Result, let state = get_state(&request); if let Some(remote_storage) = &state.remote_storage { - mgr::attach_tenant(state.conf, tenant_id, remote_storage.clone(), &ctx) - .instrument(info_span!("tenant_attach", tenant = %tenant_id)) - .await?; + mgr::attach_tenant( + state.conf, + tenant_id, + // XXX: Attach should provide the config, especially during tenant migration. + // See https://github.com/neondatabase/neon/issues/1555 + TenantConfOpt::default(), + remote_storage.clone(), + &ctx, + ) + .instrument(info_span!("tenant_attach", tenant = %tenant_id)) + .await?; } else { return Err(ApiError::BadRequest(anyhow!( "attach_tenant is not possible because pageserver was configured without remote storage" @@ -536,7 +548,11 @@ async fn tenant_size_handler(request: Request) -> Result, A // this can be long operation let inputs = tenant - .gather_size_inputs(retention_period, &ctx) + .gather_size_inputs( + retention_period, + LogicalSizeCalculationCause::TenantSizeHandler, + &ctx, + ) .await .map_err(ApiError::InternalServerError)?; @@ -703,11 +719,17 @@ pub fn html_response(status: StatusCode, data: String) -> Result, async fn tenant_create_handler(mut request: Request) -> Result, ApiError> { check_permission(&request, None)?; + let _timer = STORAGE_TIME_GLOBAL + .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()]) + .expect("bug") + .start_timer(); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let request_data: TenantCreateRequest = json_request(&mut request).await?; - let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?; + let tenant_conf = + TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?; let target_tenant_id = request_data .new_tenant_id @@ -738,6 +760,7 @@ async fn tenant_create_handler(mut request: Request) -> Result = Lazy::new(|| { register_counter_vec!( @@ -489,6 +507,15 @@ pub static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { .expect("Failed to register tenant_task_events metric") }); +pub static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_background_loop_period_overrun_count", + "Incremented whenever warn_when_period_overrun() logs a warning.", + &["task", "period"], + ) + .expect("failed to define a metric") +}); + // walreceiver metrics pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy = Lazy::new(|| { @@ -663,7 +690,9 @@ pub struct StorageTimeMetrics { } impl StorageTimeMetrics { - pub fn new(operation: &str, tenant_id: &str, timeline_id: &str) -> Self { + pub fn new(operation: StorageTimeOperation, tenant_id: &str, timeline_id: &str) -> Self { + let operation: &'static str = operation.into(); + let timeline_sum = STORAGE_TIME_SUM_PER_TIMELINE .get_metric_with_label_values(&[operation, tenant_id, timeline_id]) .unwrap(); @@ -727,16 +756,23 @@ impl TimelineMetrics { let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let flush_time_histo = StorageTimeMetrics::new("layer flush", &tenant_id, &timeline_id); - let compact_time_histo = StorageTimeMetrics::new("compact", &tenant_id, &timeline_id); + let flush_time_histo = + StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); + let compact_time_histo = + StorageTimeMetrics::new(StorageTimeOperation::Compact, &tenant_id, &timeline_id); let create_images_time_histo = - StorageTimeMetrics::new("create images", &tenant_id, &timeline_id); - let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id); - let imitate_logical_size_histo = - StorageTimeMetrics::new("imitate logical size", &tenant_id, &timeline_id); + StorageTimeMetrics::new(StorageTimeOperation::CreateImages, &tenant_id, &timeline_id); + let logical_size_histo = + StorageTimeMetrics::new(StorageTimeOperation::LogicalSize, &tenant_id, &timeline_id); + let imitate_logical_size_histo = StorageTimeMetrics::new( + StorageTimeOperation::ImitateLogicalSize, + &tenant_id, + &timeline_id, + ); let load_layer_map_histo = - StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id); - let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id); + StorageTimeMetrics::new(StorageTimeOperation::LoadLayerMap, &tenant_id, &timeline_id); + let garbage_collect_histo = + StorageTimeMetrics::new(StorageTimeOperation::Gc, &tenant_id, &timeline_id); let last_record_gauge = LAST_RECORD_LSN .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); @@ -804,7 +840,7 @@ impl Drop for TimelineMetrics { .write() .unwrap() .remove(tenant_id, timeline_id); - for op in STORAGE_TIME_OPERATIONS { + for op in StorageTimeOperation::VARIANTS { let _ = STORAGE_TIME_SUM_PER_TIMELINE.remove_label_values(&[op, tenant_id, timeline_id]); let _ = @@ -1231,4 +1267,7 @@ pub fn preinitialize_metrics() { // Initialize it eagerly, so that our alert rule can distinguish absence of the metric from metric value 0. assert_eq!(UNEXPECTED_ONDEMAND_DOWNLOADS.get(), 0); UNEXPECTED_ONDEMAND_DOWNLOADS.reset(); + + // Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels. + BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset(); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c39ef9db10..6f4a55e83a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -99,7 +99,9 @@ mod timeline; pub mod size; pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id; -pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline}; +pub use timeline::{ + LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline, +}; // re-export this function so that page_cache.rs can use it. pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; @@ -607,12 +609,9 @@ impl Tenant { remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> anyhow::Result> { - // XXX: Attach should provide the config, especially during tenant migration. - // See https://github.com/neondatabase/neon/issues/1555 - let tenant_conf = TenantConfOpt::default(); - - Self::attach_idempotent_create_marker_file(conf, tenant_id) - .context("create attach marker file")?; + // TODO dedup with spawn_load + let tenant_conf = + Self::load_tenant_config(conf, tenant_id).context("load tenant config")?; let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); let tenant = Arc::new(Tenant::new( @@ -649,45 +648,6 @@ impl Tenant { Ok(tenant) } - fn attach_idempotent_create_marker_file( - conf: &'static PageServerConf, - tenant_id: TenantId, - ) -> anyhow::Result<()> { - // Create directory with marker file to indicate attaching state. - // The load_local_tenants() function in tenant::mgr relies on the marker file - // to determine whether a tenant has finished attaching. - let tenant_dir = conf.tenant_path(&tenant_id); - let marker_file = conf.tenant_attaching_mark_file_path(&tenant_id); - debug_assert_eq!(marker_file.parent().unwrap(), tenant_dir); - // TODO: should use tokio::fs here, but - // 1. caller is not async, for good reason (it holds tenants map lock) - // 2. we'd need to think about cancel safety. Turns out dropping a tokio::fs future - // doesn't wait for the activity in the fs thread pool. - crashsafe::create_dir_all(&tenant_dir).context("create tenant directory")?; - match fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&marker_file) - { - Ok(_) => {} - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - // Either this is a retry of attach or there is a concurrent task also doing attach for this tenant. - // We cannot distinguish this here. - // The caller is responsible for ensuring there's no concurrent attach for a tenant. - {} // fsync again, we don't know if that already happened - } - err => { - err.context("create tenant attaching marker file")?; - unreachable!("we covered the Ok() case above"); - } - } - crashsafe::fsync_file_and_parent(&marker_file) - .context("fsync tenant attaching marker file and parent")?; - debug_assert!(tenant_dir.is_dir()); - debug_assert!(marker_file.is_file()); - Ok(()) - } - /// /// Background task that downloads all data for a tenant and brings it to Active state. /// @@ -839,6 +799,8 @@ impl Tenant { remote_client: RemoteTimelineClient, ctx: &RequestContext, ) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + info!("downloading index file for timeline {}", timeline_id); tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id)) .await @@ -1103,6 +1065,8 @@ impl Tenant { local_metadata: TimelineMetadata, ctx: &RequestContext, ) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + let remote_client = self.remote_storage.as_ref().map(|remote_storage| { RemoteTimelineClient::new( remote_storage.clone(), @@ -1270,8 +1234,24 @@ impl Tenant { "Cannot create timelines on inactive tenant" ); - if self.get_timeline(new_timeline_id, false).await.is_ok() { + if let Ok(existing) = self.get_timeline(new_timeline_id, false).await { debug!("timeline {new_timeline_id} already exists"); + + if let Some(remote_client) = existing.remote_client.as_ref() { + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; + } + return Ok(None); } @@ -1314,6 +1294,17 @@ impl Tenant { } }; + if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { + // Wait for the upload of the 'index_part.json` file to finish, so that when we return + // Ok, the timeline is durable in remote storage. + let kind = ancestor_timeline_id + .map(|_| "branched") + .unwrap_or("bootstrapped"); + remote_client.wait_completion().await.with_context(|| { + format!("wait for {} timeline initial uploads to complete", kind) + })?; + } + Ok(Some(loaded_timeline)) } @@ -1607,6 +1598,8 @@ impl Tenant { /// Changes tenant status to active, unless shutdown was already requested. pub async fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + let mut result = Ok(()); Self::state_send_modify_async(&self.state, |current_state| { match &*current_state { @@ -2146,6 +2139,7 @@ impl Tenant { // enough to just fsync it always. crashsafe::fsync(target_config_parent)?; + // XXX we're not fsyncing the parent dir, need to do that in case `creating_tenant` Ok(()) }; @@ -2432,9 +2426,10 @@ impl Tenant { src_timeline.initdb_lsn, src_timeline.pg_version, ); - let mut timelines = self.timelines.lock().await; - let new_timeline = self - .prepare_timeline( + + let new_timeline = { + let mut timelines = self.timelines.lock().await; + self.prepare_timeline( dst_id, &metadata, timeline_uninit_mark, @@ -2443,8 +2438,8 @@ impl Tenant { ) .await? .initialize_with_lock(ctx, &mut timelines, true, true) - .await?; - drop(timelines); + .await? + }; // Root timeline gets its layers during creation and uploads them along with the metadata. // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created. @@ -2705,6 +2700,7 @@ impl Tenant { // `max_retention_period` overrides the cutoff that is used to calculate the size // (only if it is shorter than the real cutoff). max_retention_period: Option, + cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result { let logical_sizes_at_once = self @@ -2726,6 +2722,7 @@ impl Tenant { logical_sizes_at_once, max_retention_period, &mut shared_cache, + cause, ctx, ) .await @@ -2735,8 +2732,12 @@ impl Tenant { /// This is periodically called by background worker. /// result is cached in tenant struct #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] - pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result { - let inputs = self.gather_size_inputs(None, ctx).await?; + pub async fn calculate_synthetic_size( + &self, + cause: LogicalSizeCalculationCause, + ctx: &RequestContext, + ) -> anyhow::Result { + let inputs = self.gather_size_inputs(None, cause, ctx).await?; let size = inputs.calculate()?; @@ -2788,15 +2789,23 @@ fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> a Ok(()) } +pub(crate) enum CreateTenantFilesMode { + Create, + Attach, +} + pub(crate) fn create_tenant_files( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, + mode: CreateTenantFilesMode, ) -> anyhow::Result { let target_tenant_directory = conf.tenant_path(&tenant_id); anyhow::ensure!( - !target_tenant_directory.exists(), - "cannot create new tenant repo: '{tenant_id}' directory already exists", + !target_tenant_directory + .try_exists() + .context("check existence of tenant directory")?, + "tenant directory already exists", ); let temporary_tenant_dir = @@ -2818,6 +2827,7 @@ pub(crate) fn create_tenant_files( conf, tenant_conf, tenant_id, + mode, &temporary_tenant_dir, &target_tenant_directory, ); @@ -2842,9 +2852,28 @@ fn try_create_target_tenant_dir( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, + mode: CreateTenantFilesMode, temporary_tenant_dir: &Path, target_tenant_directory: &Path, ) -> Result<(), anyhow::Error> { + match mode { + CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough + CreateTenantFilesMode::Attach => { + let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME); + let file = std::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&attach_marker_path) + .with_context(|| { + format!("could not create attach marker file {attach_marker_path:?}") + })?; + file.sync_all().with_context(|| { + format!("could not sync attach marker file: {attach_marker_path:?}") + })?; + // fsync of the directory in which the file resides comes later in this function + } + } + let temporary_tenant_timelines_dir = rebase_directory( &conf.timelines_path(&tenant_id), target_tenant_directory, @@ -2871,6 +2900,11 @@ fn try_create_target_tenant_dir( anyhow::bail!("failpoint tenant-creation-before-tmp-rename"); }); + // Make sure the current tenant directory entries are durable before renaming. + // Without this, a crash may reorder any of the directory entry creations above. + crashsafe::fsync(temporary_tenant_dir) + .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?; + fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| { format!( "move tenant {} temporary directory {} into the permanent one {}", @@ -3534,14 +3568,26 @@ mod tests { .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) .await?; + // The branchpoints should contain all timelines, even ones marked + // as Broken. + { + let branchpoints = &tline.gc_info.read().unwrap().retain_lsns; + assert_eq!(branchpoints.len(), 1); + assert_eq!(branchpoints[0], Lsn(0x40)); + } + + // You can read the key from the child branch even though the parent is + // Broken, as long as you don't need to access data from the parent. assert_eq!( - newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?, - TEST_IMG(&format!("foo at {}", Lsn(0x40))) + newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?, + TEST_IMG(&format!("foo at {}", Lsn(0x70))) ); - let branchpoints = &tline.gc_info.read().unwrap().retain_lsns; - assert_eq!(branchpoints.len(), 1); - assert_eq!(branchpoints[0], Lsn(0x40)); + // This needs to traverse to the parent, and fails. + let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err(); + assert!(err + .to_string() + .contains("will not become active. Current state: Broken")); Ok(()) } @@ -4078,3 +4124,28 @@ mod tests { Ok(()) } } + +#[cfg(not(debug_assertions))] +#[inline] +pub(crate) fn debug_assert_current_span_has_tenant_id() {} + +#[cfg(debug_assertions)] +pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy< + utils::tracing_span_assert::MultiNameExtractor<2>, +> = once_cell::sync::Lazy::new(|| { + utils::tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]) +}); + +#[cfg(debug_assertions)] +#[inline] +pub(crate) fn debug_assert_current_span_has_tenant_id() { + use utils::tracing_span_assert; + + match tracing_span_assert::check_fields_present([&*TENANT_ID_EXTRACTOR]) { + Ok(()) => (), + Err(missing) => panic!( + "missing extractors: {:?}", + missing.into_iter().map(|e| e.name()).collect::>() + ), + } +} diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 50ce942a09..50de316bc4 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,7 +9,7 @@ //! may lead to a data loss. //! use anyhow::Context; -use pageserver_api::models::{TenantConfigRequest, TenantCreateRequest}; +use pageserver_api::models; use serde::{Deserialize, Serialize}; use std::num::NonZeroU64; use std::time::Duration; @@ -292,10 +292,10 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn() move || format!("Cannot parse `{field_name}` duration {value:?}") } -impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt { +impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt { type Error = anyhow::Error; - fn try_from(request_data: &TenantCreateRequest) -> Result { + fn try_from(request_data: &'_ models::TenantConfig) -> Result { let mut tenant_conf = TenantConfOpt::default(); if let Some(gc_period) = &request_data.gc_period { @@ -377,84 +377,6 @@ impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt { } } -impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt { - type Error = anyhow::Error; - - fn try_from(request_data: &TenantConfigRequest) -> Result { - let mut tenant_conf = TenantConfOpt::default(); - if let Some(gc_period) = &request_data.gc_period { - tenant_conf.gc_period = Some( - humantime::parse_duration(gc_period) - .with_context(bad_duration("gc_period", gc_period))?, - ); - } - tenant_conf.gc_horizon = request_data.gc_horizon; - tenant_conf.image_creation_threshold = request_data.image_creation_threshold; - - if let Some(pitr_interval) = &request_data.pitr_interval { - tenant_conf.pitr_interval = Some( - humantime::parse_duration(pitr_interval) - .with_context(bad_duration("pitr_interval", pitr_interval))?, - ); - } - if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout { - tenant_conf.walreceiver_connect_timeout = Some( - humantime::parse_duration(walreceiver_connect_timeout).with_context( - bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout), - )?, - ); - } - if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout { - tenant_conf.lagging_wal_timeout = Some( - humantime::parse_duration(lagging_wal_timeout) - .with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?, - ); - } - tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag; - tenant_conf.trace_read_requests = request_data.trace_read_requests; - - tenant_conf.checkpoint_distance = request_data.checkpoint_distance; - if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout { - tenant_conf.checkpoint_timeout = Some( - humantime::parse_duration(checkpoint_timeout) - .with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?, - ); - } - tenant_conf.compaction_target_size = request_data.compaction_target_size; - tenant_conf.compaction_threshold = request_data.compaction_threshold; - - if let Some(compaction_period) = &request_data.compaction_period { - tenant_conf.compaction_period = Some( - humantime::parse_duration(compaction_period) - .with_context(bad_duration("compaction_period", compaction_period))?, - ); - } - - if let Some(eviction_policy) = &request_data.eviction_policy { - tenant_conf.eviction_policy = Some( - serde::Deserialize::deserialize(eviction_policy) - .context("parse field `eviction_policy`")?, - ); - } - - tenant_conf.min_resident_size_override = request_data.min_resident_size_override; - - if let Some(evictions_low_residence_duration_metric_threshold) = - &request_data.evictions_low_residence_duration_metric_threshold - { - tenant_conf.evictions_low_residence_duration_metric_threshold = Some( - humantime::parse_duration(evictions_low_residence_duration_metric_threshold) - .with_context(bad_duration( - "evictions_low_residence_duration_metric_threshold", - evictions_low_residence_duration_metric_threshold, - ))?, - ); - } - - Ok(tenant_conf) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 0ac8c9193f..d7d20f1cc7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -19,7 +19,7 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{Tenant, TenantState}; +use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; use crate::IGNORED_TENANT_FILE_NAME; use utils::fs_ext::PathExt; @@ -282,9 +282,15 @@ pub async fn create_tenant( // We're holding the tenants lock in write mode while doing local IO. // If this section ever becomes contentious, introduce a new `TenantState::Creating` // and do the work in that state. - let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?; + let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Create)?; + // TODO: tenant directory remains on disk if we bail out from here on. + // See https://github.com/neondatabase/neon/issues/4233 + let created_tenant = schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + let crated_tenant_id = created_tenant.tenant_id(); anyhow::ensure!( tenant_id == crated_tenant_id, @@ -466,19 +472,32 @@ pub async fn list_tenants() -> Result, TenantMapLis pub async fn attach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, + tenant_conf: TenantConfOpt, remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { tenant_map_insert(tenant_id, |vacant_entry| { - let tenant_path = conf.tenant_path(&tenant_id); - anyhow::ensure!( - !tenant_path.exists(), - "Cannot attach tenant {tenant_id}, local tenant directory already exists" - ); + let tenant_dir = create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Attach)?; + // TODO: tenant directory remains on disk if we bail out from here on. + // See https://github.com/neondatabase/neon/issues/4233 - let tenant = - Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx).context("spawn_attach")?; - vacant_entry.insert(tenant); + // Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached + let marker_file_exists = conf + .tenant_attaching_mark_file_path(&tenant_id) + .try_exists() + .context("check for attach marker file existence")?; + anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); + + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + + let attached_tenant_id = attached_tenant.tenant_id(); + anyhow::ensure!( + tenant_id == attached_tenant_id, + "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", + ); + vacant_entry.insert(Arc::clone(&attached_tenant)); Ok(()) }) .await diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index d05c103573..d243137281 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; -use super::Tenant; +use super::{LogicalSizeCalculationCause, Tenant}; use crate::tenant::Timeline; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -126,6 +126,7 @@ pub(super) async fn gather_inputs( limit: &Arc, max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, + cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result { // refresh is needed to update gc related pitr_cutoff and horizon_cutoff @@ -318,7 +319,15 @@ pub(super) async fn gather_inputs( // We left the 'size' field empty in all of the Segments so far. // Now find logical sizes for all of the points that might need or benefit from them. - fill_logical_sizes(&timelines, &mut segments, limit, logical_size_cache, ctx).await?; + fill_logical_sizes( + &timelines, + &mut segments, + limit, + logical_size_cache, + cause, + ctx, + ) + .await?; Ok(ModelInputs { segments, @@ -336,6 +345,7 @@ async fn fill_logical_sizes( segments: &mut [SegmentMeta], limit: &Arc, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, + cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result<()> { let timeline_hash: HashMap> = HashMap::from_iter( @@ -378,6 +388,7 @@ async fn fill_logical_sizes( parallel_size_calcs, timeline, lsn, + cause, ctx, cancel.child_token(), ) @@ -485,6 +496,7 @@ async fn calculate_logical_size( limit: Arc, timeline: Arc, lsn: utils::lsn::Lsn, + cause: LogicalSizeCalculationCause, ctx: RequestContext, cancel: CancellationToken, ) -> Result { @@ -493,7 +505,7 @@ async fn calculate_logical_size( .expect("global semaphore should not had been closed"); let size_res = timeline - .spawn_ondemand_logical_size_calculation(lsn, ctx, cancel) + .spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel) .instrument(info_span!("spawn_ondemand_logical_size_calculation")) .await?; Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 7e7dbd3c5c..6bf26f1da1 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -259,6 +259,7 @@ pub(crate) async fn random_init_delay( } } +/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric. pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) { // Duration::ZERO will happen because it's the "disable [bgtask]" value. if elapsed >= period && period != Duration::ZERO { @@ -271,5 +272,8 @@ pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task task, "task iteration took longer than the configured period" ); + crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT + .with_label_values(&[task, &format!("{}", period.as_secs())]) + .inc(); } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ea0dc43f54..749100a274 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -49,7 +49,7 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; -use crate::metrics::{StorageTimeMetrics, TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; @@ -398,6 +398,9 @@ pub enum PageReconstructError { /// The operation was cancelled Cancelled, + /// The ancestor of this is being stopped + AncestorStopping(TimelineId), + /// An error happened replaying WAL records #[error(transparent)] WalRedo(#[from] crate::walredo::WalRedoError), @@ -416,6 +419,9 @@ impl std::fmt::Debug for PageReconstructError { ) } Self::Cancelled => write!(f, "cancelled"), + Self::AncestorStopping(timeline_id) => { + write!(f, "ancestor timeline {timeline_id} is being stopped") + } Self::WalRedo(err) => err.fmt(f), } } @@ -434,11 +440,22 @@ impl std::fmt::Display for PageReconstructError { ) } Self::Cancelled => write!(f, "cancelled"), + Self::AncestorStopping(timeline_id) => { + write!(f, "ancestor timeline {timeline_id} is being stopped") + } Self::WalRedo(err) => err.fmt(f), } } } +#[derive(Clone, Copy)] +pub enum LogicalSizeCalculationCause { + Initial, + ConsumptionMetricsSyntheticSize, + EvictionTaskImitation, + TenantSizeHandler, +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -929,6 +946,31 @@ impl Timeline { self.state.subscribe() } + pub async fn wait_to_become_active( + &self, + _ctx: &RequestContext, // Prepare for use by cancellation + ) -> Result<(), TimelineState> { + let mut receiver = self.state.subscribe(); + loop { + let current_state = *receiver.borrow_and_update(); + match current_state { + TimelineState::Loading => { + receiver + .changed() + .await + .expect("holding a reference to self"); + } + TimelineState::Active { .. } => { + return Ok(()); + } + TimelineState::Broken { .. } | TimelineState::Stopping => { + // There's no chance the timeline can transition back into ::Active + return Err(current_state); + } + } + } + } + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); @@ -1842,18 +1884,31 @@ impl Timeline { // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); let calculated_size = match self_clone - .logical_size_calculation_task(lsn, &background_ctx, cancel) + .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) .await { Ok(s) => s, Err(CalculateLogicalSizeError::Cancelled) => { // Don't make noise, this is a common task. - // In the unlikely case that there ihs another call to this function, we'll retry + // In the unlikely case that there is another call to this function, we'll retry // because initial_logical_size is still None. info!("initial size calculation cancelled, likely timeline delete / tenant detach"); return Ok(()); } - x @ Err(_) => x.context("Failed to calculate logical size")?, + Err(CalculateLogicalSizeError::Other(err)) => { + if let Some(e @ PageReconstructError::AncestorStopping(_)) = + err.root_cause().downcast_ref() + { + // This can happen if the timeline parent timeline switches to + // Stopping state while we're still calculating the initial + // timeline size for the child, for example if the tenant is + // being detached or the pageserver is shut down. Like with + // CalculateLogicalSizeError::Cancelled, don't make noise. + info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}"); + return Ok(()); + } + return Err(err.context("Failed to calculate logical size")); + } }; // we cannot query current_logical_size.current_size() to know the current @@ -1896,6 +1951,7 @@ impl Timeline { pub fn spawn_ondemand_logical_size_calculation( self: &Arc, lsn: Lsn, + cause: LogicalSizeCalculationCause, ctx: RequestContext, cancel: CancellationToken, ) -> oneshot::Receiver> { @@ -1918,7 +1974,7 @@ impl Timeline { false, async move { let res = self_clone - .logical_size_calculation_task(lsn, &ctx, cancel) + .logical_size_calculation_task(lsn, cause, &ctx, cancel) .await; let _ = sender.send(res).ok(); Ok(()) // Receiver is responsible for handling errors @@ -1932,6 +1988,7 @@ impl Timeline { async fn logical_size_calculation_task( self: &Arc, lsn: Lsn, + cause: LogicalSizeCalculationCause, ctx: &RequestContext, cancel: CancellationToken, ) -> Result { @@ -1944,12 +2001,7 @@ impl Timeline { let cancel = cancel.child_token(); let ctx = ctx.attached_child(); self_calculation - .calculate_logical_size( - lsn, - &self_calculation.metrics.logical_size_histo, - cancel, - &ctx, - ) + .calculate_logical_size(lsn, cause, cancel, &ctx) .await }); let timeline_state_cancellation = async { @@ -2004,7 +2056,7 @@ impl Timeline { pub async fn calculate_logical_size( &self, up_to_lsn: Lsn, - storage_time_metrics: &StorageTimeMetrics, + cause: LogicalSizeCalculationCause, cancel: CancellationToken, ctx: &RequestContext, ) -> Result { @@ -2038,6 +2090,14 @@ impl Timeline { if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) { return Ok(size); } + let storage_time_metrics = match cause { + LogicalSizeCalculationCause::Initial + | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize + | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo, + LogicalSizeCalculationCause::EvictionTaskImitation => { + &self.metrics.imitate_logical_size_histo + } + }; let timer = storage_time_metrics.start_timer(); let logical_size = self .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx) @@ -2248,6 +2308,46 @@ impl Timeline { Ok(timeline) => timeline, Err(e) => return Err(PageReconstructError::from(e)), }; + + // It's possible that the ancestor timeline isn't active yet, or + // is active but hasn't yet caught up to the branch point. Wait + // for it. + // + // This cannot happen while the pageserver is running normally, + // because you cannot create a branch from a point that isn't + // present in the pageserver yet. However, we don't wait for the + // branch point to be uploaded to cloud storage before creating + // a branch. I.e., the branch LSN need not be remote consistent + // for the branching operation to succeed. + // + // Hence, if we try to load a tenant in such a state where + // 1. the existence of the branch was persisted (in IndexPart and/or locally) + // 2. but the ancestor state is behind branch_lsn because it was not yet persisted + // then we will need to wait for the ancestor timeline to + // re-stream WAL up to branch_lsn before we access it. + // + // How can a tenant get in such a state? + // - ungraceful pageserver process exit + // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219 + // + // NB: this could be avoided by requiring + // branch_lsn >= remote_consistent_lsn + // during branch creation. + match ancestor.wait_to_become_active(ctx).await { + Ok(()) => {} + Err(state) if state == TimelineState::Stopping => { + return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id)); + } + Err(state) => { + return Err(PageReconstructError::Other(anyhow::anyhow!( + "Timeline {} will not become active. Current state: {:?}", + ancestor.timeline_id, + &state, + ))); + } + } + ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?; + timeline_owned = ancestor; timeline = &*timeline_owned; prev_lsn = Lsn(u64::MAX); @@ -4388,12 +4488,6 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {} pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() { use utils::tracing_span_assert; - pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy< - tracing_span_assert::MultiNameExtractor<2>, - > = once_cell::sync::Lazy::new(|| { - tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]) - }); - pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy< tracing_span_assert::MultiNameExtractor<2>, > = once_cell::sync::Lazy::new(|| { @@ -4401,7 +4495,7 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() { }); match tracing_span_assert::check_fields_present([ - &*TENANT_ID_EXTRACTOR, + &*super::TENANT_ID_EXTRACTOR, &*TIMELINE_ID_EXTRACTOR, ]) { Ok(()) => (), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 64c44a1974..a7f24c52ed 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,7 +30,7 @@ use crate::{ tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, storage_layer::PersistentLayer, - Tenant, + LogicalSizeCalculationCause, Tenant, }, }; @@ -344,7 +344,7 @@ impl Timeline { let size = self .calculate_logical_size( lsn, - &self.metrics.imitate_logical_size_histo, + LogicalSizeCalculationCause::EvictionTaskImitation, cancel.clone(), ctx, ) @@ -419,9 +419,15 @@ impl Timeline { .inner(); let mut throwaway_cache = HashMap::new(); - let gather = - crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx) - .instrument(info_span!("gather_inputs")); + let gather = crate::tenant::size::gather_inputs( + tenant, + limit, + None, + &mut throwaway_cache, + LogicalSizeCalculationCause::EvictionTaskImitation, + ctx, + ) + .instrument(info_span!("gather_inputs")); tokio::select! { _ = cancel.cancelled() => {} diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 746d3c597e..2fdfa88209 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -28,8 +28,8 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::BrokerClientChannel; use storage_broker::Streaming; +use tokio::select; use tokio::sync::RwLock; -use tokio::{select, sync::watch}; use tracing::*; use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}; @@ -50,13 +50,13 @@ pub(super) async fn connection_manager_loop_step( ctx: &RequestContext, manager_status: &RwLock>, ) -> ControlFlow<(), ()> { - let mut timeline_state_updates = connection_manager_state + match connection_manager_state .timeline - .subscribe_for_state_updates(); - - match wait_for_active_timeline(&mut timeline_state_updates).await { - ControlFlow::Continue(()) => {} - ControlFlow::Break(()) => { + .wait_to_become_active(ctx) + .await + { + Ok(()) => {} + Err(_) => { info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); return ControlFlow::Break(()); } @@ -72,6 +72,10 @@ pub(super) async fn connection_manager_loop_step( timeline_id: connection_manager_state.timeline.timeline_id, }; + let mut timeline_state_updates = connection_manager_state + .timeline + .subscribe_for_state_updates(); + // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. @@ -195,34 +199,6 @@ pub(super) async fn connection_manager_loop_step( } } -async fn wait_for_active_timeline( - timeline_state_updates: &mut watch::Receiver, -) -> ControlFlow<(), ()> { - let current_state = *timeline_state_updates.borrow(); - if current_state == TimelineState::Active { - return ControlFlow::Continue(()); - } - - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = *timeline_state_updates.borrow(); - match new_state { - TimelineState::Active => { - debug!("Timeline state changed to active, continuing the walreceiver connection manager"); - return ControlFlow::Continue(()); - } - state => { - debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); - continue; - } - } - } - Err(_sender_dropped_error) => return ControlFlow::Break(()), - } - } -} - /// Endlessly try to subscribe for broker updates for a given timeline. async fn subscribe_for_timeline_updates( broker_client: &mut BrokerClientChannel, diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 48ced59399..fc0cc5c81e 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -305,6 +305,15 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } } + } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + utils::failpoint_sleep_millis_async!("wal-ingest-logical-message-sleep"); + } } // Iterate through all the blocks that the record modifies, and diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 21330c018f..606af9741f 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -192,8 +192,9 @@ retry: { if (!PQconsumeInput(pageserver_conn)) { - neon_log(LOG, "could not get response from pageserver: %s", - PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + neon_log(LOG, "could not get response from pageserver: %s", msg); + pfree(msg); return -1; } } @@ -343,7 +344,7 @@ pageserver_receive(void) resp = NULL; } else if (rc == -2) - neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); + neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn))); else neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc); } @@ -367,7 +368,7 @@ pageserver_flush(void) } else if (PQflush(pageserver_conn)) { - char *msg = PQerrorMessage(pageserver_conn); + char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(); neon_log(ERROR, "failed to flush page requests: %s", msg); diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index dfea84953b..58dceb3bb6 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -7,6 +7,7 @@ mod credentials; pub use credentials::ClientCredentials; mod password_hack; +pub use password_hack::parse_endpoint_param; use password_hack::PasswordHackPayload; mod flow; @@ -44,10 +45,10 @@ pub enum AuthErrorImpl { #[error( "Endpoint ID is not specified. \ Either please upgrade the postgres client library (libpq) for SNI support \ - or pass the endpoint ID (first part of the domain name) as a parameter: '?options=project%3D'. \ + or pass the endpoint ID (first part of the domain name) as a parameter: '?options=endpoint%3D'. \ See more at https://neon.tech/sni" )] - MissingProjectName, + MissingEndpointName, #[error("password authentication failed for user '{0}'")] AuthFailed(Box), @@ -88,7 +89,7 @@ impl UserFacingError for AuthError { AuthFailed(_) => self.to_string(), BadAuthMethod(_) => self.to_string(), MalformedPassword(_) => self.to_string(), - MissingProjectName => self.to_string(), + MissingEndpointName => self.to_string(), Io(_) => "Internal error".to_string(), } } diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index d45806461e..dcc93ec04c 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -52,8 +52,8 @@ pub async fn password_hack( .authenticate() .await?; - info!(project = &payload.project, "received missing parameter"); - creds.project = Some(payload.project); + info!(project = &payload.endpoint, "received missing parameter"); + creds.project = Some(payload.endpoint); let mut node = api.wake_compute(extra, creds).await?; node.config.password(payload.password); diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index b21cd79ddf..6787d82b71 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -1,6 +1,7 @@ //! User credentials used in authentication. -use crate::error::UserFacingError; +use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError}; +use itertools::Itertools; use pq_proto::StartupMessageParams; use std::collections::HashSet; use thiserror::Error; @@ -61,7 +62,15 @@ impl<'a> ClientCredentials<'a> { // Project name might be passed via PG's command-line options. let project_option = params .options_raw() - .and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project="))) + .and_then(|options| { + // We support both `project` (deprecated) and `endpoint` options for backward compatibility. + // However, if both are present, we don't exactly know which one to use. + // Therefore we require that only one of them is present. + options + .filter_map(parse_endpoint_param) + .at_most_one() + .ok()? + }) .map(|name| name.to_string()); let project_from_domain = if let Some(sni_str) = sni { @@ -177,6 +186,51 @@ mod tests { Ok(()) } + #[test] + fn parse_endpoint_from_options() -> anyhow::Result<()> { + let options = StartupMessageParams::new([ + ("user", "john_doe"), + ("options", "-ckey=1 endpoint=bar -c geqo=off"), + ]); + + let creds = ClientCredentials::parse(&options, None, None)?; + assert_eq!(creds.user, "john_doe"); + assert_eq!(creds.project.as_deref(), Some("bar")); + + Ok(()) + } + + #[test] + fn parse_three_endpoints_from_options() -> anyhow::Result<()> { + let options = StartupMessageParams::new([ + ("user", "john_doe"), + ( + "options", + "-ckey=1 endpoint=one endpoint=two endpoint=three -c geqo=off", + ), + ]); + + let creds = ClientCredentials::parse(&options, None, None)?; + assert_eq!(creds.user, "john_doe"); + assert!(creds.project.is_none()); + + Ok(()) + } + + #[test] + fn parse_when_endpoint_and_project_are_in_options() -> anyhow::Result<()> { + let options = StartupMessageParams::new([ + ("user", "john_doe"), + ("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"), + ]); + + let creds = ClientCredentials::parse(&options, None, None)?; + assert_eq!(creds.user, "john_doe"); + assert!(creds.project.is_none()); + + Ok(()) + } + #[test] fn parse_projects_identical() -> anyhow::Result<()> { let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]); diff --git a/proxy/src/auth/flow.rs b/proxy/src/auth/flow.rs index 4b982c0c5e..190abc9b2e 100644 --- a/proxy/src/auth/flow.rs +++ b/proxy/src/auth/flow.rs @@ -91,7 +91,7 @@ impl AuthFlow<'_, S, PasswordHack> { // the user neither enabled SNI nor resorted to any other method // for passing the project name we rely on. We should show them // the most helpful error message and point to the documentation. - .ok_or(AuthErrorImpl::MissingProjectName)?; + .ok_or(AuthErrorImpl::MissingEndpointName)?; Ok(payload) } diff --git a/proxy/src/auth/password_hack.rs b/proxy/src/auth/password_hack.rs index 639809e18a..33441e8c88 100644 --- a/proxy/src/auth/password_hack.rs +++ b/proxy/src/auth/password_hack.rs @@ -6,27 +6,55 @@ use bstr::ByteSlice; pub struct PasswordHackPayload { - pub project: String, + pub endpoint: String, pub password: Vec, } impl PasswordHackPayload { pub fn parse(bytes: &[u8]) -> Option { // The format is `project=;`. - let mut iter = bytes.strip_prefix(b"project=")?.splitn_str(2, ";"); - let project = iter.next()?.to_str().ok()?.to_owned(); + let mut iter = bytes.splitn_str(2, ";"); + let endpoint = iter.next()?.to_str().ok()?; + let endpoint = parse_endpoint_param(endpoint)?.to_owned(); let password = iter.next()?.to_owned(); - Some(Self { project, password }) + Some(Self { endpoint, password }) } } +pub fn parse_endpoint_param(bytes: &str) -> Option<&str> { + bytes + .strip_prefix("project=") + .or_else(|| bytes.strip_prefix("endpoint=")) +} + #[cfg(test)] mod tests { use super::*; #[test] - fn parse_password_hack_payload() { + fn parse_endpoint_param_fn() { + let input = ""; + assert!(parse_endpoint_param(input).is_none()); + + let input = "project="; + assert_eq!(parse_endpoint_param(input), Some("")); + + let input = "project=foobar"; + assert_eq!(parse_endpoint_param(input), Some("foobar")); + + let input = "endpoint="; + assert_eq!(parse_endpoint_param(input), Some("")); + + let input = "endpoint=foobar"; + assert_eq!(parse_endpoint_param(input), Some("foobar")); + + let input = "other_option=foobar"; + assert!(parse_endpoint_param(input).is_none()); + } + + #[test] + fn parse_password_hack_payload_project() { let bytes = b""; assert!(PasswordHackPayload::parse(bytes).is_none()); @@ -34,13 +62,33 @@ mod tests { assert!(PasswordHackPayload::parse(bytes).is_none()); let bytes = b"project=;"; - let payload = PasswordHackPayload::parse(bytes).expect("parsing failed"); - assert_eq!(payload.project, ""); + let payload: PasswordHackPayload = + PasswordHackPayload::parse(bytes).expect("parsing failed"); + assert_eq!(payload.endpoint, ""); assert_eq!(payload.password, b""); let bytes = b"project=foobar;pass;word"; let payload = PasswordHackPayload::parse(bytes).expect("parsing failed"); - assert_eq!(payload.project, "foobar"); + assert_eq!(payload.endpoint, "foobar"); + assert_eq!(payload.password, b"pass;word"); + } + + #[test] + fn parse_password_hack_payload_endpoint() { + let bytes = b""; + assert!(PasswordHackPayload::parse(bytes).is_none()); + + let bytes = b"endpoint="; + assert!(PasswordHackPayload::parse(bytes).is_none()); + + let bytes = b"endpoint=;"; + let payload = PasswordHackPayload::parse(bytes).expect("parsing failed"); + assert_eq!(payload.endpoint, ""); + assert_eq!(payload.password, b""); + + let bytes = b"endpoint=foobar;pass;word"; + let payload = PasswordHackPayload::parse(bytes).expect("parsing failed"); + assert_eq!(payload.endpoint, "foobar"); assert_eq!(payload.password, b"pass;word"); } } diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index d277940a12..480acb88d9 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,4 +1,4 @@ -use crate::{cancellation::CancelClosure, error::UserFacingError}; +use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError}; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use pq_proto::StartupMessageParams; @@ -279,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option { #[allow(unstable_name_collisions)] let options: String = params .options_raw()? - .filter(|opt| !opt.starts_with("project=")) + .filter(|opt| parse_endpoint_param(opt).is_none()) .intersperse(" ") // TODO: use impl from std once it's stabilized .collect(); diff --git a/scripts/pr-comment-test-report.js b/scripts/pr-comment-test-report.js index 7cb2a5494f..3a7bba0daa 100644 --- a/scripts/pr-comment-test-report.js +++ b/scripts/pr-comment-test-report.js @@ -36,11 +36,9 @@ module.exports = async ({ github, context, fetch, report }) => { // Marker to find the comment in the subsequent runs const startMarker = `` // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results :recycle:
` + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:
` // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) const githubActionsBotId = 41898282 - // The latest commit in the PR URL - const commitUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/pull/${context.payload.number}/commits/${context.payload.pull_request.head.sha}` // Commend body itself let commentBody = `${startMarker}\n` @@ -74,36 +72,46 @@ module.exports = async ({ github, context, fetch, report }) => { let flakyTestsCount = 0 const pgVersions = new Set() - const buildTypes = new Set() for (const parentSuite of suites.children) { for (const suite of parentSuite.children) { for (const test of suite.children) { - const {groups: {buildType, pgVersion}} = test.name.match(/[\[-](?debug|release)-pg(?\d+)[-\]]/) + let buildType, pgVersion + const match = test.name.match(/[\[-](?debug|release)-pg(?\d+)[-\]]/)?.groups + if (match) { + ({buildType, pgVersion} = match) + } else { + // It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance). + console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`) + + buildType = "release" + pgVersion = "14" + } pgVersions.add(pgVersion) - buildTypes.add(buildType) // Removing build type and PostgreSQL version from the test name to make it shorter const testName = test.name.replace(new RegExp(`${buildType}-pg${pgVersion}-?`), "").replace("[]", "") test.pytestName = `${parentSuite.name.replace(".", "/")}/${suite.name}.py::${testName}` + test.pgVersion = pgVersion + test.buildType = buildType if (test.status === "passed") { - passedTests[pgVersion][buildType].push(test) + passedTests[pgVersion][testName].push(test) passedTestsCount += 1 } else if (test.status === "failed" || test.status === "broken") { - failedTests[pgVersion][buildType].push(test) + failedTests[pgVersion][testName].push(test) failedTestsCount += 1 } else if (test.status === "skipped") { - skippedTests[pgVersion][buildType].push(test) + skippedTests[pgVersion][testName].push(test) skippedTestsCount += 1 } if (test.retriesCount > 0) { - retriedTests[pgVersion][buildType].push(test) + retriedTests[pgVersion][testName].push(test) if (test.retriesStatusChange) { - flakyTests[pgVersion][buildType].push(test) + flakyTests[pgVersion][testName].push(test) flakyTestsCount += 1 } } @@ -112,39 +120,44 @@ module.exports = async ({ github, context, fetch, report }) => { } const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount - commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}) for ${commitUrl})\n___\n` + commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` - // Print test resuls from the newest to the oldest PostgreSQL version for release and debug builds. + // Print test resuls from the newest to the oldest Postgres version for release and debug builds. for (const pgVersion of Array.from(pgVersions).sort().reverse()) { - for (const buildType of Array.from(buildTypes).sort().reverse()) { - if (failedTests[pgVersion][buildType].length > 0) { - commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n` - commentBody += `Failed tests:\n` - for (const test of failedTests[pgVersion][buildType]) { + if (Object.keys(failedTests[pgVersion]).length > 0) { + commentBody += `#### Failures on Posgres ${pgVersion}\n\n` + for (const [testName, tests] of Object.entries(failedTests[pgVersion])) { + const links = [] + for (const test of tests) { const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}` - - commentBody += `- [\`${test.pytestName}\`](${allureLink})` - if (test.retriesCount > 0) { - commentBody += ` (ran [${test.retriesCount + 1} times](${allureLink}/retries))` - } - commentBody += "\n" + links.push(`[${test.buildType}](${allureLink})`) } - commentBody += "\n" + commentBody += `- \`${testName}\`: ${links.join(", ")}\n` } + + const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name) + const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"` + + commentBody += "```\n" + commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n` + commentBody += `${command}\n` + commentBody += "```\n" } } if (flakyTestsCount > 0) { - commentBody += "
\nFlaky tests\n\n" + commentBody += `
\nFlaky tests (${flakyTestsCount})\n\n` for (const pgVersion of Array.from(pgVersions).sort().reverse()) { - for (const buildType of Array.from(buildTypes).sort().reverse()) { - if (flakyTests[pgVersion][buildType].length > 0) { - commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n` - for (const test of flakyTests[pgVersion][buildType]) { + if (Object.keys(flakyTests[pgVersion]).length > 0) { + commentBody += `#### Postgres ${pgVersion}\n\n` + for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) { + const links = [] + for (const test of tests) { + const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}/retries` const status = test.status === "passed" ? ":white_check_mark:" : ":x:" - commentBody += `- ${status} [\`${test.pytestName}\`](${reportUrl}#suites/${test.parentUid}/${test.uid}/retries)\n` + links.push(`[${status} ${test.buildType}](${allureLink})`) } - commentBody += "\n" + commentBody += `- \`${testName}\`: ${links.join(", ")}\n` } } } diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 388e834b56..1ff057fae2 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -272,6 +272,7 @@ class PageserverHttpClient(requests.Session): new_timeline_id: Optional[TimelineId] = None, ancestor_timeline_id: Optional[TimelineId] = None, ancestor_start_lsn: Optional[Lsn] = None, + **kwargs, ) -> Dict[Any, Any]: body: Dict[str, Any] = { "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, @@ -281,7 +282,9 @@ class PageserverHttpClient(requests.Session): if pg_version != PgVersion.NOT_SET: body["pg_version"] = int(pg_version) - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body) + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs + ) self.verbose_error(res) if res.status_code == 409: raise Exception(f"could not create timeline: already exists for id {new_timeline_id}") diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 31c7ef2b17..e8ec657683 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -136,9 +136,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") # remove the initial tenant - ## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865 assert env.initial_timeline - wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline) pageserver_http.tenant_detach(env.initial_tenant) assert isinstance(env.remote_storage, LocalFsStorage) tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant) diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 5c02708457..31f6c1f3d9 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -20,6 +20,7 @@ from fixtures.pageserver.utils import ( assert_tenant_state, wait_for_last_record_lsn, wait_for_upload, + wait_for_upload_queue_empty, wait_until_tenant_state, ) from fixtures.types import Lsn @@ -149,6 +150,7 @@ def test_ondemand_download_timetravel( ##### First start, insert data and upload it to the remote storage env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() # Override defaults, to create more layers tenant, _ = env.neon_cli.create_tenant( @@ -225,7 +227,8 @@ def test_ondemand_download_timetravel( assert filled_current_physical == filled_size, "we don't yet do layer eviction" # Wait until generated image layers are uploaded to S3 - time.sleep(3) + if remote_storage_kind is not None: + wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id) env.pageserver.stop() diff --git a/test_runner/regress/test_proxy.py b/test_runner/regress/test_proxy.py index ee6349436b..ae914e384e 100644 --- a/test_runner/regress/test_proxy.py +++ b/test_runner/regress/test_proxy.py @@ -5,16 +5,18 @@ import pytest from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres -def test_proxy_select_1(static_proxy: NeonProxy): +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) +def test_proxy_select_1(static_proxy: NeonProxy, option_name: str): """ A simplest smoke test: check proxy against a local postgres instance. """ - out = static_proxy.safe_psql("select 1", options="project=generic-project-name") + out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name") assert out[0][0] == 1 -def test_password_hack(static_proxy: NeonProxy): +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) +def test_password_hack(static_proxy: NeonProxy, option_name: str): """ Check the PasswordHack auth flow: an alternative to SCRAM auth for clients which can't provide the project/endpoint name via SNI or `options`. @@ -23,11 +25,12 @@ def test_password_hack(static_proxy: NeonProxy): user = "borat" password = "password" static_proxy.safe_psql( - f"create role {user} with login password '{password}'", options="project=irrelevant" + f"create role {user} with login password '{password}'", + options=f"{option_name}=irrelevant", ) # Note the format of `magic`! - magic = f"project=irrelevant;{password}" + magic = f"{option_name}=irrelevant;{password}" static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic) # Must also check that invalid magic won't be accepted. @@ -56,55 +59,62 @@ async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy): assert out == "42" -def test_proxy_options(static_proxy: NeonProxy): +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) +def test_proxy_options(static_proxy: NeonProxy, option_name: str): """ Check that we pass extra `options` to the PostgreSQL server: - * `project=...` shouldn't be passed at all (otherwise postgres will raise an error). + * `project=...` and `endpoint=...` shouldn't be passed at all + * (otherwise postgres will raise an error). * everything else should be passed as-is. """ - options = "project=irrelevant -cproxytest.option=value" + options = f"{option_name}=irrelevant -cproxytest.option=value" out = static_proxy.safe_psql("show proxytest.option", options=options) assert out[0][0] == "value" - options = "-c proxytest.foo=\\ str project=irrelevant" + options = f"-c proxytest.foo=\\ str {option_name}=irrelevant" out = static_proxy.safe_psql("show proxytest.foo", options=options) assert out[0][0] == " str" -def test_auth_errors(static_proxy: NeonProxy): +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) +def test_auth_errors(static_proxy: NeonProxy, option_name: str): """ Check that we throw very specific errors in some unsuccessful auth scenarios. """ # User does not exist with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", options="project=irrelevant") + static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant") text = str(exprinfo.value).strip() assert text.endswith("password authentication failed for user 'pinocchio'") static_proxy.safe_psql( - "create role pinocchio with login password 'magic'", options="project=irrelevant" + "create role pinocchio with login password 'magic'", + options=f"{option_name}=irrelevant", ) # User exists, but password is missing with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant") + static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant") text = str(exprinfo.value).strip() assert text.endswith("password authentication failed for user 'pinocchio'") # User exists, but password is wrong with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant") + static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant") text = str(exprinfo.value).strip() assert text.endswith("password authentication failed for user 'pinocchio'") # Finally, check that the user can connect - with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"): + with static_proxy.connect( + user="pinocchio", password="magic", options=f"{option_name}=irrelevant" + ): pass -def test_forward_params_to_client(static_proxy: NeonProxy): +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) +def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str): """ Check that we forward all necessary PostgreSQL server params to client. """ @@ -130,7 +140,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy): where name = any(%s) """ - with static_proxy.connect(options="project=irrelevant") as conn: + with static_proxy.connect(options=f"{option_name}=irrelevant") as conn: with conn.cursor() as cur: cur.execute(query, (reported_params_subset,)) for name, value in cur.fetchall(): @@ -138,17 +148,18 @@ def test_forward_params_to_client(static_proxy: NeonProxy): assert conn.get_parameter_status(name) == value +@pytest.mark.parametrize("option_name", ["project", "endpoint"]) @pytest.mark.timeout(5) -def test_close_on_connections_exit(static_proxy: NeonProxy): +def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str): # Open two connections, send SIGTERM, then ensure that proxy doesn't exit # until after connections close. - with static_proxy.connect(options="project=irrelevant"), static_proxy.connect( - options="project=irrelevant" + with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect( + options=f"{option_name}=irrelevant" ): static_proxy.terminate() with pytest.raises(subprocess.TimeoutExpired): static_proxy.wait_for_exit(timeout=2) # Ensure we don't accept any more connections with pytest.raises(psycopg2.OperationalError): - static_proxy.connect(options="project=irrelevant") + static_proxy.connect(options=f"{option_name}=irrelevant") static_proxy.wait_for_exit() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 6de5f7db04..02f1aac99c 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -2,11 +2,12 @@ # env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import os +import queue import shutil import threading import time from pathlib import Path -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import pytest from fixtures.log_helper import log @@ -26,6 +27,7 @@ from fixtures.pageserver.utils import ( ) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import print_gc_result, query_scalar, wait_until +from requests import ReadTimeout # @@ -81,9 +83,7 @@ def test_remote_storage_backup_and_restore( env.pageserver.allowed_errors.append(".*failed to load remote timeline.*") # we have a bunch of pytest.raises for these below env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*") - env.pageserver.allowed_errors.append( - ".*Cannot attach tenant .*?, local tenant directory already exists.*" - ) + env.pageserver.allowed_errors.append(".*tenant directory already exists.*") env.pageserver.allowed_errors.append(".*simulated failure of remote operation.*") pageserver_http = env.pageserver.http_client() @@ -626,10 +626,7 @@ def test_empty_branch_remote_storage_upload( new_branch_name = "new_branch" new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant) - - with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint: - wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id) - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) timelines_before_detach = set( map( @@ -658,13 +655,19 @@ def test_empty_branch_remote_storage_upload( ), f"Expected to have same timelines after reattach, but got {timelines_after_detach}" -# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only. -# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart. @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) def test_empty_branch_remote_storage_upload_on_restart( neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, ): + """ + Branches off a root branch, but does not write anything to the new branch, so + it has a metadata file only. + + Ensures the branch is not on the remote storage and restarts the pageserver + — the upload should be scheduled by load, and create_timeline should await + for it even though it gets 409 Conflict. + """ neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, test_name="test_empty_branch_remote_storage_upload_on_restart", @@ -673,35 +676,87 @@ def test_empty_branch_remote_storage_upload_on_restart( env = neon_env_builder.init_start() client = env.pageserver.http_client() - new_branch_name = "new_branch" - new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant) + client.configure_failpoints(("before-upload-index", "return")) - with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint: - wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id) - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + new_branch_timeline_id = TimelineId.generate() + with pytest.raises(ReadTimeout): + client.timeline_create( + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline, + new_timeline_id=new_branch_timeline_id, + pg_version=env.pg_version, + timeout=4, + ) + + env.pageserver.allowed_errors.append( + f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + + # index upload is now hitting the failpoint, should not block the shutdown env.pageserver.stop() - # Remove new branch from the remote storage - assert isinstance(env.remote_storage, LocalFsStorage) - new_branch_on_remote_storage = ( - env.remote_storage.root - / "tenants" - / str(env.initial_tenant) - / "timelines" - / str(new_branch_timeline_id) + timeline_path = ( + Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id) ) - assert ( - new_branch_on_remote_storage.is_dir() - ), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage" - shutil.rmtree(new_branch_on_remote_storage) - env.pageserver.start() + local_metadata = env.repo_dir / timeline_path / "metadata" + assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload" - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + assert isinstance(env.remote_storage, LocalFsStorage) + new_branch_on_remote_storage = env.remote_storage.root / timeline_path assert ( - new_branch_on_remote_storage.is_dir() - ), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'" + not new_branch_on_remote_storage.exists() + ), "failpoint should had prohibited index_part.json upload" + + # during reconciliation we should had scheduled the uploads and on the + # retried create_timeline, we will await for those to complete on next + # client.timeline_create + env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"}) + + # sleep a bit to force the upload task go into exponential backoff + time.sleep(1) + + q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + barrier = threading.Barrier(2) + + def create_in_background(): + barrier.wait() + try: + client.timeline_create( + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline, + new_timeline_id=new_branch_timeline_id, + pg_version=env.pg_version, + ) + q.put(None) + except PageserverApiException as e: + q.put(e) + + create_thread = threading.Thread(target=create_in_background) + create_thread.start() + + try: + # maximize chances of actually waiting for the uploads by create_timeline + barrier.wait() + + assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading" + + client.configure_failpoints(("before-upload-index", "off")) + conflict = q.get() + + assert conflict, "create_timeline should not have succeeded" + assert ( + conflict.status_code == 409 + ), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict" + + assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) + + assert ( + new_branch_on_remote_storage / "index_part.json" + ).is_file(), "uploads scheduled during initial load should had been awaited for" + finally: + create_thread.join() def wait_upload_queue_empty( @@ -752,4 +807,17 @@ def get_queued_count( return int(val) +def assert_nothing_to_upload( + client: PageserverHttpClient, + tenant_id: TenantId, + timeline_id: TimelineId, +): + """ + Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which + do not have anything to compact or gc. + """ + detail = client.timeline_detail(tenant_id, timeline_id) + assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"]) + + # TODO Test that we correctly handle GC of files that are stuck in upload queue. diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 847ae4b2b8..82664cff94 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -685,12 +685,10 @@ def test_load_attach_negatives( pageserver_http.tenant_ignore(tenant_id) - env.pageserver.allowed_errors.append( - ".*Cannot attach tenant .*?, local tenant directory already exists.*" - ) + env.pageserver.allowed_errors.append(".*tenant directory already exists.*") with pytest.raises( expected_exception=PageserverApiException, - match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists", + match="tenant directory already exists", ): pageserver_http.tenant_attach(tenant_id) @@ -734,12 +732,10 @@ def test_ignore_while_attaching( pageserver_http.tenant_ignore(tenant_id) # Cannot attach it due to some local files existing - env.pageserver.allowed_errors.append( - ".*Cannot attach tenant .*?, local tenant directory already exists.*" - ) + env.pageserver.allowed_errors.append(".*tenant directory already exists.*") with pytest.raises( expected_exception=PageserverApiException, - match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists", + match="tenant directory already exists", ): pageserver_http.tenant_attach(tenant_id) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 180afd88cd..2a5b30803b 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -1,5 +1,7 @@ import os +import shutil import threading +import time from contextlib import closing, contextmanager from pathlib import Path from typing import Any, Dict, Optional, Tuple @@ -12,6 +14,8 @@ from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, PortDistributor, + RemoteStorageKind, + available_remote_storages, ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( @@ -512,3 +516,225 @@ def test_tenant_relocation( if line.startswith("listen_pg_addr"): lines[i] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'" (env.repo_dir / "config").write_text("\n".join(lines)) + + +# Simulate hard crash of pageserver and re-attach a tenant with a branch +# +# This exercises a race condition after tenant attach, where the +# branch point on the ancestor timeline is greater than the ancestor's +# last-record LSN. We had a bug where GetPage incorrectly followed the +# timeline to the ancestor without waiting for the missing WAL to +# arrive. +@pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +def test_emergency_relocate_with_branches_slow_replay( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_emergency_relocate_with_branches_slow_replay", + ) + + env = neon_env_builder.init_start() + env.pageserver.is_testing_enabled_or_skip() + pageserver_http = env.pageserver.http_client() + + # Prepare for the test: + # + # - Main branch, with a table and two inserts to it. + # - A logical replication message between the inserts, so that we can conveniently + # pause the WAL ingestion between the two inserts. + # - Child branch, created after the inserts + tenant_id, _ = env.neon_cli.create_tenant() + + main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + with main_endpoint.cursor() as cur: + cur.execute("CREATE TABLE test_reattach (t text)") + cur.execute("INSERT INTO test_reattach VALUES ('before pause')") + + cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')") + + cur.execute("INSERT INTO test_reattach VALUES ('after pause')") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + main_endpoint.stop() + env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn) + + # Now kill the pageserver, remove the tenant directory, and restart. This simulates + # the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate + # the tenant to a different pageserver. We reuse the same pageserver because it's + # simpler than initializing a new one from scratch, but the effect on the single tenant + # is the same. + env.pageserver.stop(immediate=True) + shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id)) + env.pageserver.start() + + # This fail point will pause the WAL ingestion on the main branch, after the + # the first insert + pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + + # Attach and wait a few seconds to give it time to load the tenants, attach to the + # safekeepers, and to stream and ingest the WAL up to the pause-point. + before_attach_time = time.time() + pageserver_http.tenant_attach(tenant_id) + time.sleep(3) + + # The wal ingestion on the main timeline should now be paused at the fail point. + # Run a query on the child branch. The GetPage requests for this should recurse to the + # parent timeline, and wait for the WAL to be ingested there. Otherwise it won't see + # the second insert. + child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id) + with child_endpoint.cursor() as cur: + cur.execute("SELECT * FROM test_reattach") + assert cur.fetchall() == [("before pause",), ("after pause",)] + + # Sanity check that the failpoint was reached + assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + assert time.time() - before_attach_time > 5 + + # Clean up + pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) + + +# Simulate hard crash of pageserver and re-attach a tenant with a branch +# +# This exercises the same race condition after as +# 'test_emergency_relocate_with_branches_slow_replay', but this test case +# is closer to the original scenario where we originally found the +# issue. +# +# In this scenario, the incorrect result to get-request leads to +# *permanent damage* in the child timeline, because ingesting the WAL +# on the child timeline depended on incorrect view of the parent. This +# test reproduced one such case; the symptom was an error on the child, when +# trying to connect to the child endpoint after re-attaching the tenant: +# +# FATAL: database "neondb" does not exist +# +# In the original case where we bumped into this, the error was slightly +# different: +# +# FATAL: "base/16385" is not a valid data directory +# DETAIL: File "base/16385/PG_VERSION" is missing. +# +### Detailed explanation of the original bug and why it lead to that error: +# +# The WAL on the main and the child branches look like this: +# +# Main Child +# 1. CREATE DATABASE +# +# 2. CREATE TABLE AS SELECT ... 3. CREATE TABLE AS SELECT ... +# +# None of these WAL records have been flushed to disk or uploaded to remote +# storage in the pageserver yet, when the tenant is detached. +# +# After detach and re-attach, a walreceiver is spawned on both timelines. +# They will connect to the safekeepers and start ingesting the WAL +# from their respective IndexParts' `disk_consistent_lsn` onward. +# +# The bug occurs if the child branch's walreceiver runs before the +# main's. It receives the SMGR_CREATE WAL record emitted by the +# CREATE TABLE statement (3.), and applies it, without seeing the +# effect of the CREATE DATABASE statement. +# +# To understand why that leads to a 'File "base/16385/PG_VERSION" is +# missing' error, let's look at what the handlers for the WAL records +# do: +# +# CREATE DATABASE WAL record is handled by ingest_xlog_dbase_create: +# +# ingest_xlog_dbase_create: +# put_relmap_file() +# // NOTE 'true': It means that there is a relmapper and PG_VERSION file +# 1: let r = dbdir.dbdirs.insert((spcnode, dbnode), true); +# +# +# CREATE TABLE emits an SMGR_CREATE record, which is handled by: +# +# ingest_xlog_smgr_create: +# put_rel_creation: +# ... +# let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { +# 2: // Didn't exist. Update dbdir +# dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); +# let buf = DbDirectory::ser(&dbdir)?; +# self.put(DBDIR_KEY, Value::Image(buf.into())); +# +# // and create the RelDirectory +# RelDirectory::default() +# } else { +# 3: // reldir already exists, fetch it +# RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? +# }; +# +# +# In the correct ordering, the SMGR_CREATE record is applied after the +# CREATE DATABASE record. The CREATE DATABASE creates the entry in the +# 'dbdir', with the 'true' flag that indicates that PG_VERSION exists +# (1). The SMGR_CREATE handler calls put_rel_creation, which finds the +# dbdir entry that the CREATE DATABASE record created, and takes the +# "reldir already exists, fetch it" else-branch at the if statement (3). +# +# In the incorrect ordering, the child walreceiver applies the +# SMGR_CREATE record without seeing the effects of the CREATE +# DATABASE. In that case, put_rel_creation takes the "Didn't +# exist. Update dbir" path (2), and inserts an entry in the +# DbDirectory with 'false' to indicate there is no PG_VERSION file. +# +@pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +def test_emergency_relocate_with_branches_createdb( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_emergency_relocate_with_branches_createdb", + ) + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # create new nenant + tenant_id, _ = env.neon_cli.create_tenant() + + main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + with main_endpoint.cursor() as cur: + cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')") + + cur.execute("CREATE DATABASE neondb") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn) + + with main_endpoint.cursor(dbname="neondb") as cur: + cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)") + main_endpoint.stop() + + child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id) + with child_endpoint.cursor(dbname="neondb") as cur: + cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)") + child_endpoint.stop() + + # Kill the pageserver, remove the tenant directory, and restart + env.pageserver.stop(immediate=True) + shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id)) + env.pageserver.start() + + # Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original + # bug reproduced easily even without this, as there is always some delay between + # loading the timeline and establishing the connection to the safekeeper to stream and + # ingest the WAL, but let's make this less dependent on accidental timing. + pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + before_attach_time = time.time() + pageserver_http.tenant_attach(tenant_id) + + child_endpoint.start() + with child_endpoint.cursor(dbname="neondb") as cur: + assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 + + # Sanity check that the failpoint was reached + assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + assert time.time() - before_attach_time > 5 + + # Clean up + pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 8026d7f5c6..5642449ce6 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -217,6 +217,16 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()]) log.info(f"{sample.name}{{{labels}}} {sample.value}") + # Test that we gather tenant create metric + storage_operation_metrics = [ + "pageserver_storage_operations_seconds_global_bucket", + "pageserver_storage_operations_seconds_global_sum", + "pageserver_storage_operations_seconds_global_count", + ] + for metric in storage_operation_metrics: + value = ps_metrics.query_all(metric, filter={"operation": "create tenant"}) + assert value + @pytest.mark.parametrize( "remote_storage_kind",