diff --git a/.github/workflows/actionlint.yml b/.github/workflows/actionlint.yml index f2736614bf..078c7f88c4 100644 --- a/.github/workflows/actionlint.yml +++ b/.github/workflows/actionlint.yml @@ -24,7 +24,7 @@ jobs: actionlint: needs: [ check-permissions ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 - uses: reviewdog/action-actionlint@v1 @@ -36,3 +36,15 @@ jobs: fail_on_error: true filter_mode: nofilter level: error + - run: | + PAT='^\s*runs-on:.*-latest' + if grep -ERq $PAT .github/workflows + then + grep -ERl $PAT .github/workflows |\ + while read -r f + do + l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1) + echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead." + done + exit 1 + fi diff --git a/.github/workflows/approved-for-ci-run.yml b/.github/workflows/approved-for-ci-run.yml index ab616d17e2..b14b66a439 100644 --- a/.github/workflows/approved-for-ci-run.yml +++ b/.github/workflows/approved-for-ci-run.yml @@ -44,7 +44,7 @@ jobs: contains(fromJSON('["opened", "synchronize", "reopened", "closed"]'), github.event.action) && contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run') - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run" @@ -60,7 +60,7 @@ jobs: github.event.action == 'labeled' && contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run') - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run" @@ -109,7 +109,7 @@ jobs: github.event.action == 'closed' && github.event.pull_request.head.repo.full_name != github.repository - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Close PR and delete `ci-run/pr-${{ env.PR_NUMBER }}` branch diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 1eaf05cd54..57d24063bf 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -38,6 +38,11 @@ on: description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch' required: false default: false + run_only_pgvector_tests: + type: boolean + description: 'Run pgvector tests but no other tests. If not set, all tests including pgvector tests will be run' + required: false + default: false defaults: run: @@ -50,6 +55,7 @@ concurrency: jobs: bench: + if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} env: TEST_PG_BENCH_DURATIONS_MATRIX: "300" TEST_PG_BENCH_SCALES_MATRIX: "10,100" @@ -120,6 +126,7 @@ jobs: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} generate-matrices: + if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} # Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday) # # Available platforms: @@ -130,7 +137,7 @@ jobs: # - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage env: RUN_AWS_RDS_AND_AURORA: ${{ github.event.inputs.run_AWS_RDS_AND_AURORA || 'false' }} - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 outputs: pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }} olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }} @@ -197,6 +204,7 @@ jobs: echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT pgbench-compare: + if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} needs: [ generate-matrices ] strategy: @@ -343,6 +351,92 @@ jobs: env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + pgbench-pgvector: + env: + TEST_PG_BENCH_DURATIONS_MATRIX: "15m" + TEST_PG_BENCH_SCALES_MATRIX: "1" + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + DEFAULT_PG_VERSION: 16 + TEST_OUTPUT: /tmp/test_output + BUILD_TYPE: remote + SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }} + PLATFORM: "neon-captest-pgvector" + + runs-on: [ self-hosted, us-east-2, x64 ] + container: + image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned + options: --init + + steps: + - uses: actions/checkout@v4 + + - name: Download Neon artifact + uses: ./.github/actions/download + with: + name: neon-${{ runner.os }}-release-artifact + path: /tmp/neon/ + prefix: latest + + - name: Add Postgres binaries to PATH + run: | + ${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version + echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH + + - name: Set up Connection String + id: set-up-connstr + run: | + CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }} + + echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT + + QUERIES=("SELECT version()") + QUERIES+=("SHOW neon.tenant_id") + QUERIES+=("SHOW neon.timeline_id") + + for q in "${QUERIES[@]}"; do + psql ${CONNSTR} -c "${q}" + done + + - name: Benchmark pgvector hnsw indexing + uses: ./.github/actions/run-python-test-set + with: + build_type: ${{ env.BUILD_TYPE }} + test_selection: performance/test_perf_olap.py + run_in_parallel: false + save_perf_report: ${{ env.SAVE_PERF_REPORT }} + extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing + env: + VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" + PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" + BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }} + + - name: Benchmark pgvector hnsw queries + uses: ./.github/actions/run-python-test-set + with: + build_type: ${{ env.BUILD_TYPE }} + test_selection: performance + run_in_parallel: false + save_perf_report: ${{ env.SAVE_PERF_REPORT }} + extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector + env: + BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }} + VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" + PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" + + - name: Create Allure report + if: ${{ !cancelled() }} + uses: ./.github/actions/allure-report-generate + + - name: Post to a Slack channel + if: ${{ github.event.schedule && failure() }} + uses: slackapi/slack-github-action@v1 + with: + channel-id: "C033QLM5P7D" # dev-staging-stream + slack-message: "Periodic perf testing neon-captest-pgvector: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + env: + SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + + clickbench-compare: # ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters # we use for performance testing in pgbench-compare. @@ -351,7 +445,7 @@ jobs: # # *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows # *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB - if: ${{ !cancelled() }} + if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} needs: [ generate-matrices, pgbench-compare ] strategy: @@ -455,7 +549,7 @@ jobs: # We might change it after https://github.com/neondatabase/neon/issues/2900. # # *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB) - if: ${{ !cancelled() }} + if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} needs: [ generate-matrices, clickbench-compare ] strategy: @@ -557,7 +651,7 @@ jobs: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} user-examples-compare: - if: ${{ !cancelled() }} + if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} needs: [ generate-matrices, tpch-compare ] strategy: diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index bdf00bcaae..9aacb09d10 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -88,7 +88,7 @@ jobs: merge-images: needs: [ build-image ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 env: IMAGE_TAG: ${{ inputs.image-tag }} diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f8c011a0a5..b9caf76060 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -35,7 +35,7 @@ jobs: cancel-previous-e2e-tests: needs: [ check-permissions ] if: github.event_name == 'pull_request' - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Cancel previous e2e-tests runs for this PR @@ -549,7 +549,7 @@ jobs: report-benchmarks-failures: needs: [ benchmarks, create-test-report ] if: github.ref_name == 'main' && failure() && needs.benchmarks.result == 'failure' - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: slackapi/slack-github-action@v1 @@ -774,7 +774,7 @@ jobs: neon-image: needs: [ neon-image-arch, tag ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: docker/login-action@v3 @@ -884,7 +884,7 @@ jobs: compute-node-image: needs: [ compute-node-image-arch, tag ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: @@ -1032,7 +1032,7 @@ jobs: promote-images: needs: [ check-permissions, tag, test-images, vm-compute-node-image ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 env: VERSIONS: v14 v15 v16 @@ -1077,7 +1077,7 @@ jobs: trigger-custom-extensions-build-and-wait: needs: [ check-permissions, tag ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Set PR's status to pending and request a remote CI test run: | diff --git a/.github/workflows/check-build-tools-image.yml b/.github/workflows/check-build-tools-image.yml index a1e22cf93f..97116940a0 100644 --- a/.github/workflows/check-build-tools-image.yml +++ b/.github/workflows/check-build-tools-image.yml @@ -19,7 +19,7 @@ permissions: {} jobs: check-image: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 outputs: tag: ${{ steps.get-build-tools-tag.outputs.image-tag }} found: ${{ steps.check-image.outputs.found }} diff --git a/.github/workflows/check-permissions.yml b/.github/workflows/check-permissions.yml index c3357c6cf8..9c42794797 100644 --- a/.github/workflows/check-permissions.yml +++ b/.github/workflows/check-permissions.yml @@ -16,7 +16,7 @@ permissions: {} jobs: check-permissions: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Disallow CI runs on PRs from forks if: | diff --git a/.github/workflows/cleanup-caches-by-a-branch.yml b/.github/workflows/cleanup-caches-by-a-branch.yml index d8c225dedb..0c074e36dc 100644 --- a/.github/workflows/cleanup-caches-by-a-branch.yml +++ b/.github/workflows/cleanup-caches-by-a-branch.yml @@ -9,7 +9,7 @@ on: jobs: cleanup: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Cleanup run: | diff --git a/.github/workflows/pg_clients.yml b/.github/workflows/pg_clients.yml index 50e3227a74..fef3aec754 100644 --- a/.github/workflows/pg_clients.yml +++ b/.github/workflows/pg_clients.yml @@ -20,7 +20,7 @@ concurrency: jobs: test-postgres-client-libs: # TODO: switch to gen2 runner, requires docker - runs-on: [ ubuntu-latest ] + runs-on: ubuntu-22.04 env: DEFAULT_PG_VERSION: 14 diff --git a/.github/workflows/pin-build-tools-image.yml b/.github/workflows/pin-build-tools-image.yml index d495a158e8..024594532f 100644 --- a/.github/workflows/pin-build-tools-image.yml +++ b/.github/workflows/pin-build-tools-image.yml @@ -26,7 +26,7 @@ permissions: {} jobs: tag-image: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 env: FROM_TAG: ${{ inputs.from-tag }} diff --git a/.github/workflows/release-notify.yml b/.github/workflows/release-notify.yml index ba396dba74..8bd10e993c 100644 --- a/.github/workflows/release-notify.yml +++ b/.github/workflows/release-notify.yml @@ -19,7 +19,7 @@ on: jobs: notify: - runs-on: [ ubuntu-latest ] + runs-on: ubuntu-22.04 steps: - uses: neondatabase/dev-actions/release-pr-notify@main diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fe24f6330e..90a3aaaf2d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ defaults: jobs: create-storage-release-branch: if: ${{ github.event.schedule == '0 6 * * MON' || format('{0}', inputs.create-storage-release-branch) == 'true' }} - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 permissions: contents: write # for `git push` @@ -65,7 +65,7 @@ jobs: create-proxy-release-branch: if: ${{ github.event.schedule == '0 6 * * THU' || format('{0}', inputs.create-proxy-release-branch) == 'true' }} - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 permissions: contents: write # for `git push` diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index 7111ee37fa..77928a343e 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -19,7 +19,7 @@ env: jobs: cancel-previous-e2e-tests: if: github.event_name == 'pull_request' - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Cancel previous e2e-tests runs for this PR @@ -31,7 +31,7 @@ jobs: --field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}" tag: - runs-on: [ ubuntu-latest ] + runs-on: ubuntu-22.04 outputs: build-tag: ${{ steps.build-tag.outputs.tag }} @@ -62,7 +62,7 @@ jobs: trigger-e2e-tests: needs: [ tag ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 env: TAG: ${{ needs.tag.outputs.build-tag }} steps: diff --git a/Cargo.lock b/Cargo.lock index d8f9021eb8..44edbabaf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -776,7 +776,6 @@ dependencies = [ "pin-project", "serde", "time", - "tz-rs", "url", "uuid", ] @@ -1291,12 +1290,6 @@ dependencies = [ "tiny-keccak", ] -[[package]] -name = "const_fn" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" - [[package]] name = "const_format" version = "0.2.30" @@ -1976,21 +1969,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.1.0" @@ -2620,19 +2598,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper 0.14.26", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "hyper-util" version = "0.1.3" @@ -3168,24 +3133,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nix" version = "0.25.1" @@ -3356,15 +3303,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ - "libc", -] - [[package]] name = "oauth2" version = "4.4.2" @@ -3414,50 +3352,12 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "openssl" -version = "0.10.60" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" -dependencies = [ - "bitflags 2.4.1", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.96" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "opentelemetry" version = "0.20.0" @@ -4105,17 +4005,6 @@ dependencies = [ "tokio-postgres", ] -[[package]] -name = "postgres-native-tls" -version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" -dependencies = [ - "native-tls", - "tokio", - "tokio-native-tls", - "tokio-postgres", -] - [[package]] name = "postgres-protocol" version = "0.6.4" @@ -4224,6 +4113,7 @@ version = "0.1.0" dependencies = [ "byteorder", "bytes", + "itertools", "pin-project-lite", "postgres-protocol", "rand 0.8.5", @@ -4413,6 +4303,7 @@ dependencies = [ "http 1.1.0", "http-body-util", "humantime", + "humantime-serde", "hyper 0.14.26", "hyper 1.2.0", "hyper-util", @@ -4423,7 +4314,6 @@ dependencies = [ "md5", "measured", "metrics", - "native-tls", "once_cell", "opentelemetry", "parking_lot 0.12.1", @@ -4431,7 +4321,6 @@ dependencies = [ "parquet_derive", "pbkdf2", "pin-project-lite", - "postgres-native-tls", "postgres-protocol", "postgres_backend", "pq_proto", @@ -4450,6 +4339,7 @@ dependencies = [ "rstest", "rustc-hash", "rustls 0.22.4", + "rustls-native-certs 0.7.0", "rustls-pemfile 2.1.1", "scopeguard", "serde", @@ -4479,7 +4369,6 @@ dependencies = [ "utils", "uuid", "walkdir", - "webpki-roots 0.25.2", "workspace_hack", "x509-parser", ] @@ -4786,20 +4675,21 @@ dependencies = [ "http 0.2.9", "http-body 0.4.5", "hyper 0.14.26", - "hyper-tls", + "hyper-rustls 0.24.0", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.11", + "rustls-pemfile 1.0.2", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-native-tls", + "tokio-rustls 0.24.0", "tokio-util", "tower-service", "url", @@ -4807,6 +4697,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams 0.3.0", "web-sys", + "webpki-roots 0.25.2", "winreg 0.50.0", ] @@ -5232,20 +5123,22 @@ dependencies = [ "hex", "histogram", "itertools", - "native-tls", + "once_cell", "pageserver", "pageserver_api", - "postgres-native-tls", "postgres_ffi", "rand 0.8.5", "remote_storage", "reqwest 0.12.4", + "rustls 0.22.4", + "rustls-native-certs 0.7.0", "serde", "serde_json", "serde_with", "thiserror", "tokio", "tokio-postgres", + "tokio-postgres-rustls", "tokio-rustls 0.25.0", "tokio-stream", "tokio-util", @@ -6189,8 +6082,6 @@ checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" dependencies = [ "itoa", "js-sys", - "libc", - "num_threads", "serde", "time-core", "time-macros", @@ -6300,16 +6191,6 @@ dependencies = [ "syn 2.0.52", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-postgres" version = "0.7.7" @@ -6716,15 +6597,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" -[[package]] -name = "tz-rs" -version = "0.6.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33851b15c848fad2cf4b105c6bb66eb9512b6f6c44a4b13f57c53c73c707e2b4" -dependencies = [ - "const_fn", -] - [[package]] name = "uname" version = "0.1.1" @@ -7629,9 +7501,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" dependencies = [ "zeroize_derive", ] diff --git a/Cargo.toml b/Cargo.toml index 0887c039f8..58715db32b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } atomic-take = "1.1.0" -azure_core = "0.19" -azure_identity = "0.19" -azure_storage = "0.19" -azure_storage_blobs = "0.19" +azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] } +azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } +azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } +azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } flate2 = "1.0.26" async-stream = "0.3" async-trait = "0.1" @@ -114,7 +114,6 @@ md5 = "0.7.0" measured = { version = "0.0.21", features=["lasso"] } measured-process = { version = "0.0.21" } memoffset = "0.8" -native-tls = "0.2" nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] } notify = "6.0.0" num_cpus = "1.15" @@ -191,7 +190,7 @@ url = "2.2" urlencoding = "2.1" uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] } walkdir = "2.3.2" -webpki-roots = "0.25" +rustls-native-certs = "0.7" x509-parser = "0.15" ## TODO replace this with tracing @@ -200,7 +199,6 @@ log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } @@ -241,8 +239,7 @@ tonic-build = "0.9" [patch.crates-io] -# This is only needed for proxy's tests. -# TODO: we should probably fork `tokio-postgres-rustls` instead. +# Needed to get `tokio-postgres-rustls` to depend on our fork. tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } # bug fixes for UUID diff --git a/deny.toml b/deny.toml index 22e39a2ca3..469609c496 100644 --- a/deny.toml +++ b/deny.toml @@ -99,6 +99,13 @@ name = "async-executor" [[bans.deny]] name = "smol" +[[bans.deny]] +# We want to use rustls instead of the platform's native tls implementation. +name = "native-tls" + +[[bans.deny]] +name = "openssl" + # This section is considered when running `cargo deny check sources`. # More documentation about the 'sources' section can be found here: # https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html diff --git a/libs/pq_proto/Cargo.toml b/libs/pq_proto/Cargo.toml index 6eeb3bafef..8afabe670e 100644 --- a/libs/pq_proto/Cargo.toml +++ b/libs/pq_proto/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] bytes.workspace = true byteorder.workspace = true +itertools.workspace = true pin-project-lite.workspace = true postgres-protocol.workspace = true rand.workspace = true diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 522b65f5d1..cee3742017 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -7,8 +7,9 @@ pub mod framed; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, collections::HashMap, fmt, io, str}; +use std::{borrow::Cow, fmt, io, str}; // re-export for use in utils pageserver_feedback.rs pub use postgres_protocol::PG_EPOCH; @@ -50,15 +51,37 @@ pub enum FeStartupPacket { }, } -#[derive(Debug)] +#[derive(Debug, Clone, Default)] +pub struct StartupMessageParamsBuilder { + params: BytesMut, +} + +impl StartupMessageParamsBuilder { + /// Set parameter's value by its name. + /// name and value must not contain a \0 byte + pub fn insert(&mut self, name: &str, value: &str) { + self.params.put(name.as_bytes()); + self.params.put(&b"\0"[..]); + self.params.put(value.as_bytes()); + self.params.put(&b"\0"[..]); + } + + pub fn freeze(self) -> StartupMessageParams { + StartupMessageParams { + params: self.params.freeze(), + } + } +} + +#[derive(Debug, Clone, Default)] pub struct StartupMessageParams { - params: HashMap, + params: Bytes, } impl StartupMessageParams { /// Get parameter's value by its name. pub fn get(&self, name: &str) -> Option<&str> { - self.params.get(name).map(|s| s.as_str()) + self.iter().find_map(|(k, v)| (k == name).then_some(v)) } /// Split command-line options according to PostgreSQL's logic, @@ -112,15 +135,19 @@ impl StartupMessageParams { /// Iterate through key-value pairs in an arbitrary order. pub fn iter(&self) -> impl Iterator { - self.params.iter().map(|(k, v)| (k.as_str(), v.as_str())) + let params = + std::str::from_utf8(&self.params).expect("should be validated as utf8 already"); + params.split_terminator('\0').tuples() } // This function is mostly useful in tests. #[doc(hidden)] pub fn new<'a, const N: usize>(pairs: [(&'a str, &'a str); N]) -> Self { - Self { - params: pairs.map(|(k, v)| (k.to_owned(), v.to_owned())).into(), + let mut b = StartupMessageParamsBuilder::default(); + for (k, v) in pairs { + b.insert(k, v) } + b.freeze() } } @@ -345,35 +372,21 @@ impl FeStartupPacket { (major_version, minor_version) => { // StartupMessage - // Parse pairs of null-terminated strings (key, value). - // See `postgres: ProcessStartupPacket, build_startup_packet`. - let mut tokens = str::from_utf8(&msg) - .map_err(|_e| { - ProtocolError::BadMessage("StartupMessage params: invalid utf-8".to_owned()) - })? - .strip_suffix('\0') // drop packet's own null - .ok_or_else(|| { - ProtocolError::Protocol( - "StartupMessage params: missing null terminator".to_string(), - ) - })? - .split_terminator('\0'); - - let mut params = HashMap::new(); - while let Some(name) = tokens.next() { - let value = tokens.next().ok_or_else(|| { - ProtocolError::Protocol( - "StartupMessage params: key without value".to_string(), - ) - })?; - - params.insert(name.to_owned(), value.to_owned()); - } + let s = str::from_utf8(&msg).map_err(|_e| { + ProtocolError::BadMessage("StartupMessage params: invalid utf-8".to_owned()) + })?; + let s = s.strip_suffix('\0').ok_or_else(|| { + ProtocolError::Protocol( + "StartupMessage params: missing null terminator".to_string(), + ) + })?; FeStartupPacket::StartupMessage { major_version, minor_version, - params: StartupMessageParams { params }, + params: StartupMessageParams { + params: msg.slice_ref(s.as_bytes()), + }, } } }; diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 8704b72921..870684b399 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -9,6 +9,33 @@ use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::*; +/// Declare a failpoint that can use the `pause` failpoint action. +/// We don't want to block the executor thread, hence, spawn_blocking + await. +#[macro_export] +macro_rules! pausable_failpoint { + ($name:literal) => { + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint {}", $name); + fail::fail_point!($name); + } + }) + .await + .expect("spawn_blocking"); + } + }; + ($name:literal, $cond:expr) => { + if cfg!(feature = "testing") { + if $cond { + pausable_failpoint!($name) + } + } + }; +} + /// use with fail::cfg("$name", "return(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 540eb10ed2..e6bfd57a44 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -42,6 +42,7 @@ use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::failpoint_support; use utils::fs_ext; +use utils::pausable_failpoint; use utils::sync::gate::Gate; use utils::sync::gate::GateGuard; use utils::timeout::timeout_cancellable; @@ -122,32 +123,6 @@ use utils::{ lsn::{Lsn, RecordLsn}, }; -/// Declare a failpoint that can use the `pause` failpoint action. -/// We don't want to block the executor thread, hence, spawn_blocking + await. -macro_rules! pausable_failpoint { - ($name:literal) => { - if cfg!(feature = "testing") { - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!("at failpoint {}", $name); - fail::fail_point!($name); - } - }) - .await - .expect("spawn_blocking"); - } - }; - ($name:literal, $cond:expr) => { - if cfg!(feature = "testing") { - if $cond { - pausable_failpoint!($name) - } - } - }; -} - pub mod blob_io; pub mod block_io; pub mod vectored_blob_io; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 3173a33dad..7c6640eaac 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -8,7 +8,7 @@ use tokio::sync::OwnedMutexGuard; use tokio_util::sync::CancellationToken; use tracing::{error, instrument, Instrument}; -use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId}; +use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ config::PageServerConf, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 23904b9da4..73438a790f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -197,6 +197,7 @@ pub(crate) use upload::upload_initdb_dir; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use utils::pausable_failpoint; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index caa843316f..e8e824f415 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -9,7 +9,7 @@ use std::time::SystemTime; use tokio::fs::{self, File}; use tokio::io::AsyncSeekExt; use tokio_util::sync::CancellationToken; -use utils::backoff; +use utils::{backoff, pausable_failpoint}; use super::Generation; use crate::tenant::remote_timeline_client::{ diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 252b6eb11b..af6840f525 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -187,6 +187,7 @@ impl SecondaryTenant { }; let now = SystemTime::now(); + tracing::info!("Evicting secondary layer"); let this = self.clone(); diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 0ec1bd649b..5c915d6b53 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -909,6 +909,7 @@ impl<'a> TenantDownloader<'a> { strftime(&layer.access_time), strftime(evicted_at) ); + self.skip_layer(layer); continue; } } @@ -963,6 +964,15 @@ impl<'a> TenantDownloader<'a> { Ok(()) } + /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics + fn skip_layer(&self, layer: HeatMapLayer) { + let mut progress = self.secondary_state.progress.lock().unwrap(); + progress.layers_total = progress.layers_total.saturating_sub(1); + progress.bytes_total = progress + .bytes_total + .saturating_sub(layer.metadata.file_size); + } + async fn download_layer( &self, tenant_shard_id: &TenantShardId, @@ -1012,13 +1022,7 @@ impl<'a> TenantDownloader<'a> { "Skipped downloading missing layer {}, raced with compaction/gc?", layer.name ); - - // If the layer is 404, adjust the progress statistics to reflect that we will not download it. - let mut progress = self.secondary_state.progress.lock().unwrap(); - progress.layers_total = progress.layers_total.saturating_sub(1); - progress.bytes_total = progress - .bytes_total - .saturating_sub(layer.metadata.file_size); + self.skip_layer(layer); return Ok(None); } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index ba2b8afd03..bf2d8a47b4 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -17,7 +17,7 @@ use crate::tenant::{Tenant, TenantState}; use rand::Rng; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::{backoff, completion}; +use utils::{backoff, completion, pausable_failpoint}; static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1bdbddd95f..d4f6e25843 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -41,7 +41,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ bin_ser::BeSer, - fs_ext, + fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, vec_map::VecMap, }; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index b5dfc86e77..5ca8544d49 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -7,7 +7,7 @@ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, instrument, Instrument}; -use utils::{crashsafe, fs_ext, id::TimelineId}; +use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ config::PageServerConf, diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index a9c8d59c3a..5eae2d8204 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -125,13 +125,6 @@ typedef struct * - WL_EXIT_ON_PM_DEATH. */ WaitEventSet *wes_read; - /*--- - * WaitEventSet containing: - * - WL_SOCKET_WRITABLE on 'conn' - * - WL_LATCH_SET on MyLatch, and - * - WL_EXIT_ON_PM_DEATH. - */ - WaitEventSet *wes_write; } PageServer; static PageServer page_servers[MAX_SHARDS]; @@ -336,11 +329,6 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) FreeWaitEventSet(shard->wes_read); shard->wes_read = NULL; } - if (shard->wes_write) - { - FreeWaitEventSet(shard->wes_write); - shard->wes_write = NULL; - } if (shard->conn) { PQfinish(shard->conn); @@ -436,22 +424,6 @@ pageserver_connect(shardno_t shard_no, int elevel) return false; } - shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL); - - shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE, - PQsocket(shard->conn), - NULL, NULL); - shard->state = PS_Connecting_Startup; /* fallthrough */ } @@ -460,13 +432,12 @@ pageserver_connect(shardno_t shard_no, int elevel) char *pagestream_query; int ps_send_query_ret; bool connected = false; - + int poll_result = PGRES_POLLING_WRITING; neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup"); do { WaitEvent event; - int poll_result = PQconnectPoll(shard->conn); switch (poll_result) { @@ -497,25 +468,45 @@ pageserver_connect(shardno_t shard_no, int elevel) } case PGRES_POLLING_READING: /* Sleep until there's something to do */ - (void) WaitEventSetWait(shard->wes_read, -1L, &event, 1, - PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - /* query cancellation, backend shutdown */ - CHECK_FOR_INTERRUPTS(); - + while (true) + { + int rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE, + PQsocket(shard->conn), + 0, + PG_WAIT_EXTENSION); + elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + /* query cancellation, backend shutdown */ + CHECK_FOR_INTERRUPTS(); + } + if (rc & WL_SOCKET_READABLE) + break; + } /* PQconnectPoll() handles the socket polling state updates */ break; case PGRES_POLLING_WRITING: /* Sleep until there's something to do */ - (void) WaitEventSetWait(shard->wes_write, -1L, &event, 1, - PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - /* query cancellation, backend shutdown */ - CHECK_FOR_INTERRUPTS(); - + while (true) + { + int rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE, + PQsocket(shard->conn), + 0, + PG_WAIT_EXTENSION); + elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + /* query cancellation, backend shutdown */ + CHECK_FOR_INTERRUPTS(); + } + if (rc & WL_SOCKET_WRITEABLE) + break; + } /* PQconnectPoll() handles the socket polling state updates */ break; @@ -524,12 +515,22 @@ pageserver_connect(shardno_t shard_no, int elevel) connected = true; break; } + poll_result = PQconnectPoll(shard->conn); + elog(DEBUG5, "PQconnectPoll=>%d", poll_result); } while (!connected); /* No more polling needed; connection succeeded */ shard->last_connect_time = GetCurrentTimestamp(); + shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL); + + switch (neon_protocol_version) { case 2: diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ac505fe6fb..0e4d210be8 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -584,9 +584,9 @@ prefetch_read(PrefetchRequest *slot) slot->response != NULL || slot->my_ring_index != MyPState->ring_receive) neon_shard_log(slot->shard_no, ERROR, - "Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu", - slot->status, (size_t) (void *) slot->response, - slot->my_ring_index, MyPState->ring_receive); + "Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu", + slot->status, slot->response, + (long)slot->my_ring_index, (long)MyPState->ring_receive); old = MemoryContextSwitchTo(MyPState->errctx); response = (NeonResponse *) page_server->receive(slot->shard_no); @@ -606,8 +606,8 @@ prefetch_read(PrefetchRequest *slot) else { neon_shard_log(slot->shard_no, WARNING, - "No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", - slot->my_ring_index, + "No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", + (long)slot->my_ring_index, RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)), slot->buftag.forkNum, slot->buftag.blockNum); return false; diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 7da0763bc1..288f7769fe 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -38,6 +38,7 @@ hmac.workspace = true hostname.workspace = true http.workspace = true humantime.workspace = true +humantime-serde.workspace = true hyper.workspace = true hyper1 = { package = "hyper", version = "1.2", features = ["server"] } hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] } @@ -82,6 +83,7 @@ thiserror.workspace = true tikv-jemallocator.workspace = true tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] } tokio-postgres.workspace = true +tokio-postgres-rustls.workspace = true tokio-rustls.workspace = true tokio-util.workspace = true tokio = { workspace = true, features = ["signal"] } @@ -94,10 +96,8 @@ url.workspace = true urlencoding.workspace = true utils.workspace = true uuid.workspace = true -webpki-roots.workspace = true +rustls-native-certs.workspace = true x509-parser.workspace = true -native-tls.workspace = true -postgres-native-tls.workspace = true postgres-protocol.workspace = true redis.workspace = true diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 3555eba543..f757a15fbb 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -35,7 +35,7 @@ use crate::{ }, stream, url, }; -use crate::{scram, EndpointCacheKey, EndpointId, Normalize, RoleName}; +use crate::{scram, EndpointCacheKey, EndpointId, RoleName}; /// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality pub enum MaybeOwned<'a, T> { diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 415a4b7d85..5932e1337c 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -100,6 +100,7 @@ pub(super) async fn authenticate( .dbname(&db_info.dbname) .user(&db_info.user); + ctx.set_dbname(db_info.dbname.into()); ctx.set_user(db_info.user.into()); ctx.set_project(db_info.aux.clone()); info!("woken up a compute node"); diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 783a1a5a21..d06f5614f1 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -11,7 +11,6 @@ use crate::{ }; use itertools::Itertools; use pq_proto::StartupMessageParams; -use smol_str::SmolStr; use std::{collections::HashSet, net::IpAddr, str::FromStr}; use thiserror::Error; use tracing::{info, warn}; @@ -96,13 +95,6 @@ impl ComputeUserInfoMaybeEndpoint { let get_param = |key| params.get(key).ok_or(MissingKey(key)); let user: RoleName = get_param("user")?.into(); - // record the values if we have them - ctx.set_application(params.get("application_name").map(SmolStr::from)); - ctx.set_user(user.clone()); - if let Some(dbname) = params.get("database") { - ctx.set_dbname(dbname.into()); - } - // Project name might be passed via PG's command-line options. let endpoint_option = params .options_raw() diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 5be0653a09..e541739f71 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -557,14 +557,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { let config::ConcurrencyLockOptions { shards, - permits, + limiter, epoch, timeout, } = args.wake_compute_lock.parse()?; - info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)"); + info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); let locks = Box::leak(Box::new(console::locks::ApiLocks::new( "wake_compute_lock", - permits, + limiter, shards, timeout, epoch, @@ -603,14 +603,19 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { let config::ConcurrencyLockOptions { shards, - permits, + limiter, epoch, timeout, } = args.connect_compute_lock.parse()?; - info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)"); + info!( + ?limiter, + shards, + ?epoch, + "Using NodeLocks (connect_compute)" + ); let connect_compute_locks = console::locks::ApiLocks::new( "connect_compute_lock", - permits, + limiter, shards, timeout, epoch, diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 4433b3c1c2..feb09d5638 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -10,11 +10,14 @@ use crate::{ }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; +use once_cell::sync::OnceCell; use pq_proto::StartupMessageParams; -use std::{io, net::SocketAddr, time::Duration}; +use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError}; +use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; use tokio::net::TcpStream; use tokio_postgres::tls::MakeTlsConnect; +use tokio_postgres_rustls::MakeRustlsConnect; use tracing::{error, info, warn}; const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node"; @@ -30,7 +33,7 @@ pub enum ConnectionError { CouldNotConnect(#[from] io::Error), #[error("{COULD_NOT_CONNECT}: {0}")] - TlsError(#[from] native_tls::Error), + TlsError(#[from] InvalidDnsNameError), #[error("{COULD_NOT_CONNECT}: {0}")] WakeComputeError(#[from] WakeComputeError), @@ -257,7 +260,7 @@ pub struct PostgresConnection { /// Socket connected to a compute node. pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream< tokio::net::TcpStream, - postgres_native_tls::TlsStream, + tokio_postgres_rustls::RustlsStream, >, /// PostgreSQL connection parameters. pub params: std::collections::HashMap, @@ -282,12 +285,23 @@ impl ConnCfg { let (socket_addr, stream, host) = self.connect_raw(timeout).await?; drop(pause); - let tls_connector = native_tls::TlsConnector::builder() - .danger_accept_invalid_certs(allow_self_signed_compute) - .build() - .unwrap(); - let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector); - let tls = MakeTlsConnect::::make_tls_connect(&mut mk_tls, host)?; + let client_config = if allow_self_signed_compute { + // Allow all certificates for creating the connection + let verifier = Arc::new(AcceptEverythingVerifier) as Arc; + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(verifier) + } else { + let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone(); + rustls::ClientConfig::builder().with_root_certificates(root_store) + }; + let client_config = client_config.with_no_client_auth(); + + let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config); + let tls = >::make_tls_connect( + &mut mk_tls, + host, + )?; // connect_raw() will not use TLS if sslmode is "disable" let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute); @@ -340,6 +354,58 @@ fn filtered_options(params: &StartupMessageParams) -> Option { Some(options) } +fn load_certs() -> Result, io::Error> { + let der_certs = rustls_native_certs::load_native_certs()?; + let mut store = rustls::RootCertStore::empty(); + store.add_parsable_certificates(der_certs); + Ok(Arc::new(store)) +} +static TLS_ROOTS: OnceCell> = OnceCell::new(); + +#[derive(Debug)] +struct AcceptEverythingVerifier; +impl ServerCertVerifier for AcceptEverythingVerifier { + fn supported_verify_schemes(&self) -> Vec { + use rustls::SignatureScheme::*; + // The schemes for which `SignatureScheme::supported_in_tls13` returns true. + vec![ + ECDSA_NISTP521_SHA512, + ECDSA_NISTP384_SHA384, + ECDSA_NISTP256_SHA256, + RSA_PSS_SHA512, + RSA_PSS_SHA384, + RSA_PSS_SHA256, + ED25519, + ] + } + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 5a0c251ce2..f4707a33aa 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,7 +1,7 @@ use crate::{ auth::{self, backend::AuthRateLimiter}, console::locks::ApiLocks, - rate_limiter::RateBucketInfo, + rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}, scram::threadpool::ThreadPool, serverless::{cancel_set::CancelSet, GlobalConnPoolOptions}, Host, @@ -580,14 +580,18 @@ impl RetryConfig { } /// Helper for cmdline cache options parsing. +#[derive(serde::Deserialize)] pub struct ConcurrencyLockOptions { /// The number of shards the lock map should have pub shards: usize, /// The number of allowed concurrent requests for each endpoitn - pub permits: usize, + #[serde(flatten)] + pub limiter: RateLimiterConfig, /// Garbage collection epoch + #[serde(deserialize_with = "humantime_serde::deserialize")] pub epoch: Duration, /// Lock timeout + #[serde(deserialize_with = "humantime_serde::deserialize")] pub timeout: Duration, } @@ -596,13 +600,18 @@ impl ConcurrencyLockOptions { pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0"; /// Default options for [`crate::console::provider::ApiLocks`]. pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str = - "shards=64,permits=10,epoch=10m,timeout=10ms"; + "shards=64,permits=100,epoch=10m,timeout=10ms"; // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s"; /// Parse lock options passed via cmdline. /// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`]. fn parse(options: &str) -> anyhow::Result { + let options = options.trim(); + if options.starts_with('{') && options.ends_with('}') { + return Ok(serde_json::from_str(options)?); + } + let mut shards = None; let mut permits = None; let mut epoch = None; @@ -629,9 +638,13 @@ impl ConcurrencyLockOptions { shards = Some(2); } + let permits = permits.context("missing `permits`")?; let out = Self { shards: shards.context("missing `shards`")?, - permits: permits.context("missing `permits`")?, + limiter: RateLimiterConfig { + algorithm: RateLimitAlgorithm::Fixed, + initial_limit: permits, + }, epoch: epoch.context("missing `epoch`")?, timeout: timeout.context("missing `timeout`")?, }; @@ -657,6 +670,8 @@ impl FromStr for ConcurrencyLockOptions { #[cfg(test)] mod tests { + use crate::rate_limiter::Aimd; + use super::*; #[test] @@ -684,36 +699,68 @@ mod tests { fn test_parse_lock_options() -> anyhow::Result<()> { let ConcurrencyLockOptions { epoch, - permits, + limiter, shards, timeout, } = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?; assert_eq!(epoch, Duration::from_secs(10 * 60)); assert_eq!(timeout, Duration::from_secs(1)); assert_eq!(shards, 32); - assert_eq!(permits, 4); + assert_eq!(limiter.initial_limit, 4); + assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed); let ConcurrencyLockOptions { epoch, - permits, + limiter, shards, timeout, } = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?; assert_eq!(epoch, Duration::from_secs(60)); assert_eq!(timeout, Duration::from_millis(100)); assert_eq!(shards, 16); - assert_eq!(permits, 8); + assert_eq!(limiter.initial_limit, 8); + assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed); let ConcurrencyLockOptions { epoch, - permits, + limiter, shards, timeout, } = "permits=0".parse()?; assert_eq!(epoch, Duration::ZERO); assert_eq!(timeout, Duration::ZERO); assert_eq!(shards, 2); - assert_eq!(permits, 0); + assert_eq!(limiter.initial_limit, 0); + assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed); + + Ok(()) + } + + #[test] + fn test_parse_json_lock_options() -> anyhow::Result<()> { + let ConcurrencyLockOptions { + epoch, + limiter, + shards, + timeout, + } = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"# + .parse()?; + assert_eq!(epoch, Duration::from_secs(10 * 60)); + assert_eq!(timeout, Duration::from_secs(1)); + assert_eq!(shards, 32); + assert_eq!(limiter.initial_limit, 44); + assert_eq!( + limiter.algorithm, + RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 5, + max: 500, + dec: 0.9, + inc: 10, + utilisation: 0.8 + } + }, + ); Ok(()) } diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 3b996cdbd1..4d074f98a5 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -15,11 +15,11 @@ use crate::{ error::ReportableError, intern::ProjectIdInt, metrics::ApiLockMetrics, + rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token}, scram, EndpointCacheKey, }; use dashmap::DashMap; use std::{hash::Hash, sync::Arc, time::Duration}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; @@ -443,8 +443,8 @@ impl ApiCaches { /// Various caches for [`console`](super). pub struct ApiLocks { name: &'static str, - node_locks: DashMap>, - permits: usize, + node_locks: DashMap>, + config: RateLimiterConfig, timeout: Duration, epoch: std::time::Duration, metrics: &'static ApiLockMetrics, @@ -452,8 +452,6 @@ pub struct ApiLocks { #[derive(Debug, thiserror::Error)] pub enum ApiLockError { - #[error("lock was closed")] - AcquireError(#[from] tokio::sync::AcquireError), #[error("permit could not be acquired")] TimeoutError(#[from] tokio::time::error::Elapsed), } @@ -461,7 +459,6 @@ pub enum ApiLockError { impl ReportableError for ApiLockError { fn get_error_kind(&self) -> crate::error::ErrorKind { match self { - ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service, ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit, } } @@ -470,7 +467,7 @@ impl ReportableError for ApiLockError { impl ApiLocks { pub fn new( name: &'static str, - permits: usize, + config: RateLimiterConfig, shards: usize, timeout: Duration, epoch: std::time::Duration, @@ -479,7 +476,7 @@ impl ApiLocks { Ok(Self { name, node_locks: DashMap::with_shard_amount(shards), - permits, + config, timeout, epoch, metrics, @@ -487,8 +484,10 @@ impl ApiLocks { } pub async fn get_permit(&self, key: &K) -> Result { - if self.permits == 0 { - return Ok(WakeComputePermit { permit: None }); + if self.config.initial_limit == 0 { + return Ok(WakeComputePermit { + permit: Token::disabled(), + }); } let now = Instant::now(); let semaphore = { @@ -500,24 +499,22 @@ impl ApiLocks { .entry(key.clone()) .or_insert_with(|| { self.metrics.semaphores_registered.inc(); - Arc::new(Semaphore::new(self.permits)) + DynamicLimiter::new(self.config) }) .clone() } }; - let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await; + let permit = semaphore.acquire_deadline(now + self.timeout).await; self.metrics .semaphore_acquire_seconds .observe(now.elapsed().as_secs_f64()); - Ok(WakeComputePermit { - permit: Some(permit??), - }) + Ok(WakeComputePermit { permit: permit? }) } pub async fn garbage_collect_worker(&self) { - if self.permits == 0 { + if self.config.initial_limit == 0 { return; } let mut interval = @@ -547,12 +544,21 @@ impl ApiLocks { } pub struct WakeComputePermit { - // None if the lock is disabled - permit: Option, + permit: Token, } impl WakeComputePermit { pub fn should_check_cache(&self) -> bool { - self.permit.is_some() + !self.permit.is_disabled() + } + pub fn release(self, outcome: Outcome) { + self.permit.release(outcome) + } + pub fn release_result(self, res: Result) -> Result { + match res { + Ok(_) => self.release(Outcome::Success), + Err(_) => self.release(Outcome::Overload), + } + res } } diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 7728d2cafa..d72229b029 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -13,7 +13,7 @@ use crate::{ http, metrics::{CacheOutcome, Metrics}, rate_limiter::EndpointRateLimiter, - scram, EndpointCacheKey, Normalize, + scram, EndpointCacheKey, }; use crate::{cache::Cached, context::RequestMonitoring}; use futures::TryFutureExt; @@ -281,14 +281,6 @@ impl super::Api for Api { return Ok(cached); } - // check rate limit - if !self - .wake_compute_endpoint_rate_limiter - .check(user_info.endpoint.normalize().into(), 1) - { - return Err(WakeComputeError::TooManyConnections); - } - let permit = self.locks.get_permit(&key).await?; // after getting back a permit - it's possible the cache was filled @@ -301,7 +293,16 @@ impl super::Api for Api { } } - let mut node = self.do_wake_compute(ctx, user_info).await?; + // check rate limit + if !self + .wake_compute_endpoint_rate_limiter + .check(user_info.endpoint.normalize_intern(), 1) + { + info!(key = &*key, "found cached compute node info"); + return Err(WakeComputeError::TooManyConnections); + } + + let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?; ctx.set_project(node.aux.clone()); let cold_start_info = node.aux.cold_start_info; info!("woken up a compute node"); diff --git a/proxy/src/context.rs b/proxy/src/context.rs index dfd3ef108e..ff79ba8275 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -2,6 +2,7 @@ use chrono::Utc; use once_cell::sync::OnceCell; +use pq_proto::StartupMessageParams; use smol_str::SmolStr; use std::net::IpAddr; use tokio::sync::mpsc; @@ -46,6 +47,7 @@ pub struct RequestMonitoring { pub(crate) auth_method: Option, success: bool, pub(crate) cold_start_info: ColdStartInfo, + pg_options: Option, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. @@ -102,6 +104,7 @@ impl RequestMonitoring { success: false, rejected: None, cold_start_info: ColdStartInfo::Unknown, + pg_options: None, sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()), disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()), @@ -132,6 +135,18 @@ impl RequestMonitoring { self.latency_timer.cold_start_info(info); } + pub fn set_db_options(&mut self, options: StartupMessageParams) { + self.set_application(options.get("application_name").map(SmolStr::from)); + if let Some(user) = options.get("user") { + self.set_user(user.into()); + } + if let Some(dbname) = options.get("database") { + self.set_dbname(dbname.into()); + } + + self.pg_options = Some(options); + } + pub fn set_project(&mut self, x: MetricsAuxInfo) { if self.endpoint_id.is_none() { self.set_endpoint_id(x.endpoint_id.as_str().into()) @@ -155,8 +170,10 @@ impl RequestMonitoring { } } - pub fn set_application(&mut self, app: Option) { - self.application = app.or_else(|| self.application.clone()); + fn set_application(&mut self, app: Option) { + if let Some(app) = app { + self.application = Some(app); + } } pub fn set_dbname(&mut self, dbname: DbName) { diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index a213a32ca4..1355b7e1d8 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -13,7 +13,9 @@ use parquet::{ }, record::RecordWriter, }; +use pq_proto::StartupMessageParams; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; +use serde::ser::SerializeMap; use tokio::{sync::mpsc, time}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, Span}; @@ -87,6 +89,7 @@ pub struct RequestData { database: Option, project: Option, branch: Option, + pg_options: Option, auth_method: Option<&'static str>, error: Option<&'static str>, /// Success is counted if we form a HTTP response with sql rows inside @@ -101,6 +104,23 @@ pub struct RequestData { disconnect_timestamp: Option, } +struct Options<'a> { + options: &'a StartupMessageParams, +} + +impl<'a> serde::Serialize for Options<'a> { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + let mut state = s.serialize_map(None)?; + for (k, v) in self.options.iter() { + state.serialize_entry(k, v)?; + } + state.end() + } +} + impl From<&RequestMonitoring> for RequestData { fn from(value: &RequestMonitoring) -> Self { Self { @@ -113,6 +133,10 @@ impl From<&RequestMonitoring> for RequestData { database: value.dbname.as_deref().map(String::from), project: value.project.as_deref().map(String::from), branch: value.branch.as_deref().map(String::from), + pg_options: value + .pg_options + .as_ref() + .and_then(|options| serde_json::to_string(&Options { options }).ok()), auth_method: value.auth_method.as_ref().map(|x| match x { super::AuthMethod::Web => "web", super::AuthMethod::ScramSha256 => "scram_sha_256", @@ -494,6 +518,7 @@ mod tests { database: Some(hex::encode(rng.gen::<[u8; 16]>())), project: Some(hex::encode(rng.gen::<[u8; 16]>())), branch: Some(hex::encode(rng.gen::<[u8; 16]>())), + pg_options: None, auth_method: None, protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)], region: "us-east-1", @@ -570,15 +595,15 @@ mod tests { assert_eq!( file_stats, [ - (1315314, 3, 6000), - (1315307, 3, 6000), - (1315367, 3, 6000), - (1315324, 3, 6000), - (1315454, 3, 6000), - (1315296, 3, 6000), - (1315088, 3, 6000), - (1315324, 3, 6000), - (438713, 1, 2000) + (1315874, 3, 6000), + (1315867, 3, 6000), + (1315927, 3, 6000), + (1315884, 3, 6000), + (1316014, 3, 6000), + (1315856, 3, 6000), + (1315648, 3, 6000), + (1315884, 3, 6000), + (438913, 1, 2000) ] ); @@ -608,11 +633,11 @@ mod tests { assert_eq!( file_stats, [ - (1222212, 5, 10000), - (1228362, 5, 10000), - (1230156, 5, 10000), - (1229518, 5, 10000), - (1220796, 5, 10000) + (1223214, 5, 10000), + (1229364, 5, 10000), + (1231158, 5, 10000), + (1230520, 5, 10000), + (1221798, 5, 10000) ] ); @@ -644,11 +669,11 @@ mod tests { assert_eq!( file_stats, [ - (1207859, 5, 10000), - (1207590, 5, 10000), - (1207883, 5, 10000), - (1207871, 5, 10000), - (1208126, 5, 10000) + (1208861, 5, 10000), + (1208592, 5, 10000), + (1208885, 5, 10000), + (1208873, 5, 10000), + (1209128, 5, 10000) ] ); @@ -673,15 +698,15 @@ mod tests { assert_eq!( file_stats, [ - (1315314, 3, 6000), - (1315307, 3, 6000), - (1315367, 3, 6000), - (1315324, 3, 6000), - (1315454, 3, 6000), - (1315296, 3, 6000), - (1315088, 3, 6000), - (1315324, 3, 6000), - (438713, 1, 2000) + (1315874, 3, 6000), + (1315867, 3, 6000), + (1315927, 3, 6000), + (1315884, 3, 6000), + (1316014, 3, 6000), + (1315856, 3, 6000), + (1315648, 3, 6000), + (1315884, 3, 6000), + (438913, 1, 2000) ] ); @@ -718,7 +743,7 @@ mod tests { // files are smaller than the size threshold, but they took too long to fill so were flushed early assert_eq!( file_stats, - [(659462, 2, 3001), (659176, 2, 3000), (658972, 2, 2999)] + [(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)] ); tmpdir.close().unwrap(); diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 35c1616481..ea92eaaa55 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -3,6 +3,7 @@ use std::convert::Infallible; use anyhow::{bail, Context}; +use intern::{EndpointIdInt, EndpointIdTag, InternId}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -129,20 +130,22 @@ macro_rules! smol_str_wrapper { const POOLER_SUFFIX: &str = "-pooler"; -pub trait Normalize { - fn normalize(&self) -> Self; -} - -impl + From> Normalize for S { +impl EndpointId { fn normalize(&self) -> Self { - if self.as_ref().ends_with(POOLER_SUFFIX) { - let mut s = self.as_ref().to_string(); - s.truncate(s.len() - POOLER_SUFFIX.len()); - s.into() + if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) { + stripped.into() } else { self.clone() } } + + fn normalize_intern(&self) -> EndpointIdInt { + if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) { + EndpointIdTag::get_interner().get_or_intern(stripped) + } else { + self.into() + } + } } // 90% of role name strings are 20 characters or less. diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 5824b70df9..95b46ae002 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -267,6 +267,8 @@ pub async fn handle_client( }; drop(pause); + ctx.set_db_options(params.clone()); + let hostname = mode.hostname(stream.get_ref()); let common_names = tls.map(|tls| &tls.common_names); diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index c8528d0296..409d45b39a 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -84,8 +84,8 @@ impl ConnectMechanism for TcpMechanism<'_> { timeout: time::Duration, ) -> Result { let host = node_info.config.get_host()?; - let _permit = self.locks.get_permit(&host).await?; - node_info.connect(ctx, timeout).await + let permit = self.locks.get_permit(&host).await?; + permit.release_result(node_info.connect(ctx, timeout).await) } fn update_connect_config(&self, config: &mut compute::ConnCfg) { diff --git a/proxy/src/rate_limiter.rs b/proxy/src/rate_limiter.rs index c542267547..be9072dd8c 100644 --- a/proxy/src/rate_limiter.rs +++ b/proxy/src/rate_limiter.rs @@ -1,2 +1,6 @@ +mod limit_algorithm; mod limiter; +pub use limit_algorithm::{ + aimd::Aimd, DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token, +}; pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo}; diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs new file mode 100644 index 0000000000..072fdb80b0 --- /dev/null +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -0,0 +1,275 @@ +//! Algorithms for controlling concurrency limits. +use parking_lot::Mutex; +use std::{pin::pin, sync::Arc, time::Duration}; +use tokio::{ + sync::Notify, + time::{error::Elapsed, timeout_at, Instant}, +}; + +use self::aimd::Aimd; + +pub mod aimd; + +/// Whether a job succeeded or failed as a result of congestion/overload. +/// +/// Errors not considered to be caused by overload should be ignored. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Outcome { + /// The job succeeded, or failed in a way unrelated to overload. + Success, + /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal + /// was observed. + Overload, +} + +/// An algorithm for controlling a concurrency limit. +pub trait LimitAlgorithm: Send + Sync + 'static { + /// Update the concurrency limit in response to a new job completion. + fn update(&self, old_limit: usize, sample: Sample) -> usize; +} + +/// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay). +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub struct Sample { + pub(crate) latency: Duration, + /// Jobs in flight when the sample was taken. + pub(crate) in_flight: usize, + pub(crate) outcome: Outcome, +} + +#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum RateLimitAlgorithm { + #[default] + Fixed, + Aimd { + #[serde(flatten)] + conf: Aimd, + }, +} + +pub struct Fixed; + +impl LimitAlgorithm for Fixed { + fn update(&self, old_limit: usize, _sample: Sample) -> usize { + old_limit + } +} + +#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)] +pub struct RateLimiterConfig { + #[serde(flatten)] + pub algorithm: RateLimitAlgorithm, + pub initial_limit: usize, +} + +impl RateLimiterConfig { + pub fn create_rate_limit_algorithm(self) -> Box { + match self.algorithm { + RateLimitAlgorithm::Fixed => Box::new(Fixed), + RateLimitAlgorithm::Aimd { conf } => Box::new(conf), + } + } +} + +pub struct LimiterInner { + alg: Box, + available: usize, + limit: usize, + in_flight: usize, +} + +impl LimiterInner { + fn update(&mut self, latency: Duration, outcome: Option) { + if let Some(outcome) = outcome { + let sample = Sample { + latency, + in_flight: self.in_flight, + outcome, + }; + self.limit = self.alg.update(self.limit, sample); + } + } + + fn take(&mut self, ready: &Notify) -> Option<()> { + if self.available > 1 { + self.available -= 1; + self.in_flight += 1; + + // tell the next in the queue that there is a permit ready + if self.available > 1 { + ready.notify_one(); + } + Some(()) + } else { + None + } + } +} + +/// Limits the number of concurrent jobs. +/// +/// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the +/// token once the job is finished. +/// +/// The limit will be automatically adjusted based on observed latency (delay) and/or failures +/// caused by overload (loss). +pub struct DynamicLimiter { + config: RateLimiterConfig, + inner: Mutex, + // to notify when a token is available + ready: Notify, +} + +/// A concurrency token, required to run a job. +/// +/// Release the token back to the [`DynamicLimiter`] after the job is complete. +pub struct Token { + start: Instant, + limiter: Option>, +} + +/// A snapshot of the state of the [`DynamicLimiter`]. +/// +/// Not guaranteed to be consistent under high concurrency. +#[derive(Debug, Clone, Copy)] +pub struct LimiterState { + limit: usize, + in_flight: usize, +} + +impl DynamicLimiter { + /// Create a limiter with a given limit control algorithm. + pub fn new(config: RateLimiterConfig) -> Arc { + let ready = Notify::new(); + ready.notify_one(); + + Arc::new(Self { + inner: Mutex::new(LimiterInner { + alg: config.create_rate_limit_algorithm(), + available: config.initial_limit, + limit: config.initial_limit, + in_flight: 0, + }), + ready, + config, + }) + } + + /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available. + /// + /// Returns `None` if there are none available after `duration`. + pub async fn acquire_timeout(self: &Arc, duration: Duration) -> Result { + self.acquire_deadline(Instant::now() + duration).await + } + + /// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available. + /// + /// Returns `None` if there are none available after `deadline`. + pub async fn acquire_deadline(self: &Arc, deadline: Instant) -> Result { + if self.config.initial_limit == 0 { + // If the rate limiter is disabled, we can always acquire a token. + Ok(Token::disabled()) + } else { + let mut notified = pin!(self.ready.notified()); + let mut ready = notified.as_mut().enable(); + loop { + let mut limit = None; + if ready { + let mut inner = self.inner.lock(); + if inner.take(&self.ready).is_some() { + break Ok(Token::new(self.clone())); + } + limit = Some(inner.limit); + } + match timeout_at(deadline, notified.as_mut()).await { + Ok(()) => ready = true, + Err(e) => { + let limit = limit.unwrap_or_else(|| self.inner.lock().limit); + tracing::info!(limit, "could not acquire token in time"); + break Err(e); + } + } + } + } + } + + /// Return the concurrency [Token], along with the outcome of the job. + /// + /// The [Outcome] of the job, and the time taken to perform it, may be used + /// to update the concurrency limit. + /// + /// Set the outcome to `None` to ignore the job. + fn release_inner(&self, start: Instant, outcome: Option) { + tracing::info!("outcome is {:?}", outcome); + if self.config.initial_limit == 0 { + return; + } + + let mut inner = self.inner.lock(); + + inner.update(start.elapsed(), outcome); + if inner.in_flight < inner.limit { + inner.available = inner.limit - inner.in_flight; + // At least 1 permit is now available + self.ready.notify_one(); + } + + inner.in_flight -= 1; + } + + /// The current state of the limiter. + pub fn state(&self) -> LimiterState { + let inner = self.inner.lock(); + LimiterState { + limit: inner.limit, + in_flight: inner.in_flight, + } + } +} + +impl Token { + fn new(limiter: Arc) -> Self { + Self { + start: Instant::now(), + limiter: Some(limiter), + } + } + pub fn disabled() -> Self { + Self { + start: Instant::now(), + limiter: None, + } + } + + pub fn is_disabled(&self) -> bool { + self.limiter.is_none() + } + + pub fn release(mut self, outcome: Outcome) { + self.release_mut(Some(outcome)) + } + + pub fn release_mut(&mut self, outcome: Option) { + if let Some(limiter) = self.limiter.take() { + limiter.release_inner(self.start, outcome); + } + } +} + +impl Drop for Token { + fn drop(&mut self) { + self.release_mut(None) + } +} + +impl LimiterState { + /// The current concurrency limit. + pub fn limit(&self) -> usize { + self.limit + } + /// The number of jobs in flight. + pub fn in_flight(&self) -> usize { + self.in_flight + } +} diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs new file mode 100644 index 0000000000..370d4be802 --- /dev/null +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -0,0 +1,184 @@ +use std::usize; + +use super::{LimitAlgorithm, Outcome, Sample}; + +/// Loss-based congestion avoidance. +/// +/// Additive-increase, multiplicative decrease. +/// +/// Adds available currency when: +/// 1. no load-based errors are observed, and +/// 2. the utilisation of the current limit is high. +/// +/// Reduces available concurrency by a factor when load-based errors are detected. +#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)] +pub struct Aimd { + /// Minimum limit for AIMD algorithm. + pub min: usize, + /// Maximum limit for AIMD algorithm. + pub max: usize, + /// Decrease AIMD decrease by value in case of error. + pub dec: f32, + /// Increase AIMD increase by value in case of success. + pub inc: usize, + /// A threshold below which the limit won't be increased. + pub utilisation: f32, +} + +impl LimitAlgorithm for Aimd { + fn update(&self, old_limit: usize, sample: Sample) -> usize { + use Outcome::*; + match sample.outcome { + Success => { + let utilisation = sample.in_flight as f32 / old_limit as f32; + + if utilisation > self.utilisation { + let limit = old_limit + self.inc; + let increased_limit = limit.clamp(self.min, self.max); + if increased_limit > old_limit { + tracing::info!(increased_limit, "limit increased"); + } + + increased_limit + } else { + old_limit + } + } + Overload => { + let limit = old_limit as f32 * self.dec; + + // Floor instead of round, so the limit reduces even with small numbers. + // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1 + let limit = limit.floor() as usize; + + limit.clamp(self.min, self.max) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::rate_limiter::limit_algorithm::{ + DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig, + }; + + use super::*; + + #[tokio::test(start_paused = true)] + async fn should_decrease_limit_on_overload() { + let config = RateLimiterConfig { + initial_limit: 10, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 1500, + inc: 10, + dec: 0.5, + utilisation: 0.8, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + token.release(Outcome::Overload); + + assert_eq!(limiter.state().limit(), 5, "overload: decrease"); + } + + #[tokio::test(start_paused = true)] + async fn should_increase_limit_on_success_when_using_gt_util_threshold() { + let config = RateLimiterConfig { + initial_limit: 4, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 1500, + inc: 1, + dec: 0.5, + utilisation: 0.5, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + let _token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + let _token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + + token.release(Outcome::Success); + assert_eq!(limiter.state().limit(), 5, "success: increase"); + } + + #[tokio::test(start_paused = true)] + async fn should_not_change_limit_on_success_when_using_lt_util_threshold() { + let config = RateLimiterConfig { + initial_limit: 4, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 1500, + inc: 10, + dec: 0.5, + utilisation: 0.5, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + + token.release(Outcome::Success); + assert_eq!( + limiter.state().limit(), + 4, + "success: ignore when < half limit" + ); + } + + #[tokio::test(start_paused = true)] + async fn should_not_change_limit_when_no_outcome() { + let config = RateLimiterConfig { + initial_limit: 10, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 1500, + inc: 10, + dec: 0.5, + utilisation: 0.5, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + drop(token); + assert_eq!(limiter.state().limit(), 10, "ignore"); + } +} diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 52fc7b556a..a40c66a80d 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -232,9 +232,9 @@ impl ConnectMechanism for TokioMechanism { .connect_timeout(timeout); let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute); - let (client, connection) = config.connect(tokio_postgres::NoTls).await?; + let res = config.connect(tokio_postgres::NoTls).await; drop(pause); - drop(permit); + let (client, connection) = permit.release_result(res)?; tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id())); Ok(poll_client( diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 5376bddfd3..9d6a475aeb 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -17,6 +17,7 @@ use hyper1::http::HeaderValue; use hyper1::Response; use hyper1::StatusCode; use hyper1::{HeaderMap, Request}; +use pq_proto::StartupMessageParamsBuilder; use serde_json::json; use serde_json::Value; use tokio::time; @@ -192,13 +193,13 @@ fn get_conn_info( let mut options = Option::None; + let mut params = StartupMessageParamsBuilder::default(); + params.insert("user", &username); + params.insert("database", &dbname); for (key, value) in pairs { - match &*key { - "options" => { - options = Some(NeonOptions::parse_options_raw(&value)); - } - "application_name" => ctx.set_application(Some(value.into())), - _ => {} + params.insert(&key, &value); + if key == "options" { + options = Some(NeonOptions::parse_options_raw(&value)); } } diff --git a/pyproject.toml b/pyproject.toml index 131d1121f7..c7f1a07512 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ build-backend = "poetry.core.masonry.api" exclude = [ "^vendor/", "^target/", + "test_runner/performance/pgvector/loaddata.py", ] check_untyped_defs = true # Help mypy find imports when running against list of individual files. diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index dd5d453a2b..e56bd43fb8 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -22,8 +22,7 @@ serde_with.workspace = true workspace_hack.workspace = true utils.workspace = true async-stream.workspace = true -native-tls.workspace = true -postgres-native-tls.workspace = true +tokio-postgres-rustls.workspace = true postgres_ffi.workspace = true tokio-stream.workspace = true tokio-postgres.workspace = true @@ -31,6 +30,9 @@ tokio-util = { workspace = true } futures-util.workspace = true itertools.workspace = true camino.workspace = true +rustls.workspace = true +rustls-native-certs.workspace = true +once_cell.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } chrono = { workspace = true, default-features = false, features = ["clock", "serde"] } diff --git a/s3_scrubber/src/scan_safekeeper_metadata.rs b/s3_scrubber/src/scan_safekeeper_metadata.rs index 73dd49ceb5..24051b03de 100644 --- a/s3_scrubber/src/scan_safekeeper_metadata.rs +++ b/s3_scrubber/src/scan_safekeeper_metadata.rs @@ -1,7 +1,8 @@ -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use aws_sdk_s3::Client; use futures::stream::{StreamExt, TryStreamExt}; +use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use postgres_ffi::{XLogFileName, PG_TLI}; use serde::Serialize; @@ -70,9 +71,12 @@ pub async fn scan_safekeeper_metadata( "checking bucket {}, region {}, dump_db_table {}", bucket_config.bucket, bucket_config.region, dump_db_table ); - // Use the native TLS implementation (Neon requires TLS) - let tls_connector = - postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap()); + // Use rustls (Neon requires TLS) + let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone(); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config); let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?; // The connection object performs the actual communication with the database, // so spawn it off to run on its own. @@ -234,3 +238,11 @@ async fn check_timeline( is_deleted: false, }) } + +fn load_certs() -> Result, std::io::Error> { + let der_certs = rustls_native_certs::load_native_certs()?; + let mut store = rustls::RootCertStore::empty(); + store.add_parsable_certificates(der_certs); + Ok(Arc::new(store)) +} +static TLS_ROOTS: OnceCell> = OnceCell::new(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 808bb1e490..4aacd3421d 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -287,6 +287,26 @@ async fn timeline_files_handler(request: Request) -> Result .map_err(|e| ApiError::InternalServerError(e.into())) } +/// Force persist control file and remove old WAL. +async fn timeline_checkpoint_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; + + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid)?; + tli.maybe_persist_control_file(true) + .await + .map_err(ApiError::InternalServerError)?; + tli.remove_old_wal() + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( @@ -553,6 +573,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file", |r| request_span(r, patch_control_file_handler), ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint", + |r| request_span(r, timeline_checkpoint_handler), + ) // for tests .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 93b51f32c0..f7cc40f58a 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -11,6 +11,7 @@ use tracing::info; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, + pausable_failpoint, }; use crate::{ @@ -162,6 +163,8 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result filenames.remove(control_file_index); filenames.insert(0, "safekeeper.control".to_string()); + pausable_failpoint!("sk-pull-timeline-after-list-pausable"); + info!( "downloading {} files from safekeeper {}", filenames.len(), @@ -183,6 +186,13 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result let mut file = tokio::fs::File::create(&file_path).await?; let mut response = client.get(&http_url).send().await?; + if response.status() != reqwest::StatusCode::OK { + bail!( + "pulling file {} failed: status is {}", + filename, + response.status() + ); + } while let Some(chunk) = response.chunk().await? { file.write_all(&chunk).await?; file.flush().await?; diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 98ce671182..3400eee9b7 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -15,7 +15,7 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> { for tli in &tlis { let ttid = tli.ttid; async { - if let Err(e) = tli.maybe_persist_control_file().await { + if let Err(e) = tli.maybe_persist_control_file(false).await { warn!("failed to persist control file: {e}"); } if let Err(e) = tli.remove_old_wal().await { diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4b1481a397..2a620f5fef 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -827,9 +827,9 @@ where /// Persist control file if there is something to save and enough time /// passed after the last save. - pub async fn maybe_persist_inmem_control_file(&mut self) -> Result { + pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result { const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); - if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { return Ok(false); } let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 0cc6153373..f30c503382 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -821,9 +821,9 @@ impl Timeline { /// passed after the last save. This helps to keep remote_consistent_lsn up /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. - pub async fn maybe_persist_control_file(self: &Arc) -> Result<()> { + pub async fn maybe_persist_control_file(self: &Arc, force: bool) -> Result<()> { let mut guard = self.write_shared_state().await; - let changed = guard.sk.maybe_persist_inmem_control_file().await?; + let changed = guard.sk.maybe_persist_inmem_control_file(force).await?; guard.skip_update = !changed; Ok(()) } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index e74ba37ad8..ed544352f9 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -106,7 +106,7 @@ pub async fn main_task( if !is_active { // TODO: maybe use tokio::spawn? - if let Err(e) = tli.maybe_persist_control_file().await { + if let Err(e) = tli.maybe_persist_control_file(false).await { warn!("control file save in update_status failed: {:?}", e); } } diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index b5458b5c26..e9be765669 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -5,6 +5,8 @@ from typing import Any, Type, TypeVar, Union T = TypeVar("T", bound="Id") +DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024 + @total_ordering class Lsn: @@ -67,6 +69,9 @@ class Lsn: def as_int(self) -> int: return self.lsn_int + def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn": + return Lsn(self.lsn_int - (self.lsn_int % seg_sz)) + @dataclass(frozen=True) class Key: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 36aa18f1f9..b8ef63faa9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3771,7 +3771,7 @@ class SafekeeperPort: @dataclass -class Safekeeper: +class Safekeeper(LogUtils): """An object representing a running safekeeper daemon.""" env: NeonEnv @@ -3779,6 +3779,13 @@ class Safekeeper: id: int running: bool = False + def __init__(self, env: NeonEnv, port: SafekeeperPort, id: int, running: bool = False): + self.env = env + self.port = port + self.id = id + self.running = running + self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log" + def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper": assert self.running is False self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts) @@ -3839,11 +3846,38 @@ class Safekeeper: port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled ) - def data_dir(self) -> str: - return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") + def get_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: + timeline_status = self.http_client().timeline_status(tenant_id, timeline_id) + timeline_start_lsn = timeline_status.timeline_start_lsn + log.info(f"sk {self.id} timeline start LSN: {timeline_start_lsn}") + return timeline_start_lsn - def timeline_dir(self, tenant_id, timeline_id) -> str: - return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id)) + def get_flush_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: + timeline_status = self.http_client().timeline_status(tenant_id, timeline_id) + flush_lsn = timeline_status.flush_lsn + log.info(f"sk {self.id} flush LSN: {flush_lsn}") + return flush_lsn + + def pull_timeline( + self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId + ) -> Dict[str, Any]: + """ + pull_timeline from srcs to self. + """ + src_https = [f"http://localhost:{sk.port.http}" for sk in srcs] + res = self.http_client().pull_timeline( + {"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https} + ) + src_ids = [sk.id for sk in srcs] + log.info(f"finished pulling timeline from {src_ids} to {self.id}") + return res + + @property + def data_dir(self) -> Path: + return self.env.repo_dir / "safekeepers" / f"sk{self.id}" + + def timeline_dir(self, tenant_id, timeline_id) -> Path: + return self.data_dir / str(tenant_id) / str(timeline_id) def list_segments(self, tenant_id, timeline_id) -> List[str]: """ @@ -3856,6 +3890,35 @@ class Safekeeper: segments.sort() return segments + def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + """ + Assuming pageserver(s) uploaded to s3 up to `lsn`, + 1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it. + 2) checkpoint timeline on safekeeper, which should remove WAL before this LSN. + """ + cli = self.http_client() + + def are_lsns_advanced(): + stat = cli.timeline_status(tenant_id, timeline_id) + log.info( + f"waiting for remote_consistent_lsn and backup_lsn on sk {self.id} to reach {lsn}, currently remote_consistent_lsn={stat.remote_consistent_lsn}, backup_lsn={stat.backup_lsn}" + ) + assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn() + + # xxx: max wait is long because we might be waiting for reconnection from + # pageserver to this safekeeper + wait_until(30, 1, are_lsns_advanced) + cli.checkpoint(tenant_id, timeline_id) + + def wait_until_paused(self, failpoint: str): + msg = f"at failpoint {failpoint}" + + def paused(): + log.info(f"waiting for hitting failpoint {failpoint}") + self.assert_log_contains(msg) + + wait_until(20, 0.5, paused) + class S3Scrubber: def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None): diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 82148d0556..a5480f557f 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -177,6 +177,13 @@ class SafekeeperHttpClient(requests.Session): ) res.raise_for_status() + def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", + json={}, + ) + res.raise_for_status() + # only_local doesn't remove segments in the remote storage. def timeline_delete( self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 89e116df28..b55329e054 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -196,7 +196,7 @@ def query_scalar(cur: cursor, query: str) -> Any: # Traverse directory to get total size. -def get_dir_size(path: str) -> int: +def get_dir_size(path: Path) -> int: """Return size in bytes.""" totalbytes = 0 for root, _dirs, files in os.walk(path): @@ -560,3 +560,25 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str elapsed = time.time() - started_at log.info(f"assert_pageserver_backups_equal completed in {elapsed}s") + + +class PropagatingThread(threading.Thread): + _target: Any + _args: Any + _kwargs: Any + """ + Simple Thread wrapper with join() propagating the possible exception in the thread. + """ + + def run(self): + self.exc = None + try: + self.ret = self._target(*self._args, **self._kwargs) + except BaseException as e: + self.exc = e + + def join(self, timeout=None): + super(PropagatingThread, self).join(timeout) + if self.exc: + raise self.exc + return self.ret diff --git a/test_runner/performance/pgvector/HNSW_build.sql b/test_runner/performance/pgvector/HNSW_build.sql new file mode 100644 index 0000000000..9e6918b755 --- /dev/null +++ b/test_runner/performance/pgvector/HNSW_build.sql @@ -0,0 +1,47 @@ + +\set ECHO queries +\timing + +-- prepare test table +DROP TABLE IF EXISTS hnsw_test_table; +CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA; +INSERT INTO hnsw_test_table SELECT * FROM documents; +CREATE INDEX ON hnsw_test_table (_id); -- needed later for random tuple queries +-- tune index build params +SET max_parallel_maintenance_workers = 7; +SET maintenance_work_mem = '8GB'; +-- create HNSW index for the supported distance metrics +CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops); +CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops); +CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops); +CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops); +CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops); +-- note: in a second psql session we can monitor the progress of the index build phases using +-- the following query: +-- SELECT phase, round(100.0 * blocks_done / nullif(blocks_total, 0), 1) AS "%" FROM pg_stat_progress_create_index; + +-- show all indexes built on the table +SELECT + idx.relname AS index_name, + tbl.relname AS table_name, + am.amname AS access_method, + a.attname AS column_name, + opc.opcname AS operator_class +FROM + pg_index i +JOIN + pg_class idx ON idx.oid = i.indexrelid +JOIN + pg_class tbl ON tbl.oid = i.indrelid +JOIN + pg_am am ON am.oid = idx.relam +JOIN + pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey) +JOIN + pg_opclass opc ON opc.oid = i.indclass[0] +WHERE + tbl.relname = 'hnsw_test_table' + AND a.attname = 'embeddings'; + +-- show table sizes +\dt+ diff --git a/test_runner/performance/pgvector/IVFFLAT_build.sql b/test_runner/performance/pgvector/IVFFLAT_build.sql new file mode 100644 index 0000000000..338980831a --- /dev/null +++ b/test_runner/performance/pgvector/IVFFLAT_build.sql @@ -0,0 +1,52 @@ + +\set ECHO queries +\timing + +-- prepare test table +DROP TABLE IF EXISTS ivfflat_test_table; +CREATE TABLE ivfflat_test_table AS TABLE documents WITH NO DATA; +INSERT INTO ivfflat_test_table SELECT * FROM documents; +CREATE INDEX ON ivfflat_test_table (_id); -- needed later for random tuple queries +-- tune index build params +SET max_parallel_maintenance_workers = 7; +SET maintenance_work_mem = '8GB'; +-- create ivfflat index for the supported distance metrics +-- the formulat for lists is # rows / 1000 or sqrt(# rows) if # rows > 1 million +-- we have 1 million embeddings of vector size 1536 in column embeddings of table documents +-- so we use 1000 lists +CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_l2_ops) WITH (lists = 1000); +CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_ip_ops) WITH (lists = 1000); +CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_cosine_ops) WITH (lists = 1000); +CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings::halfvec(1536) halfvec_l2_ops) WITH (lists = 1000); +CREATE INDEX ON ivfflat_test_table + USING ivfflat ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops) WITH (lists = 1000); + +\d ivfflat_test_table + + +-- show all indexes built on the table +SELECT + idx.relname AS index_name, + tbl.relname AS table_name, + am.amname AS access_method, + a.attname AS column_name, + opc.opcname AS operator_class +FROM + pg_index i +JOIN + pg_class idx ON idx.oid = i.indexrelid +JOIN + pg_class tbl ON tbl.oid = i.indrelid +JOIN + pg_am am ON am.oid = idx.relam +JOIN + pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey) +JOIN + pg_opclass opc ON opc.oid = i.indclass[0] +WHERE + tbl.relname = 'ivfflat_test_table' + AND a.attname = 'embeddings'; +-- show table sizes +\dt+ + + diff --git a/test_runner/performance/pgvector/README.md b/test_runner/performance/pgvector/README.md new file mode 100644 index 0000000000..83495d270a --- /dev/null +++ b/test_runner/performance/pgvector/README.md @@ -0,0 +1,55 @@ +# Source of the dataset for pgvector tests + +This readme was copied from https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M + +## Download the parquet files + +```bash +brew install git-lfs +git-lfs clone https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M +``` + +## Load into postgres: + +see loaddata.py in this directory + +## Rest of dataset card as on huggingface + +--- +dataset_info: + features: + - name: _id + dtype: string + - name: title + dtype: string + - name: text + dtype: string + - name: text-embedding-3-large-1536-embedding + sequence: float64 + splits: + - name: train + num_bytes: 12679725776 + num_examples: 1000000 + download_size: 9551862565 + dataset_size: 12679725776 +configs: +- config_name: default + data_files: + - split: train + path: data/train-* +license: mit +task_categories: +- feature-extraction +language: +- en +size_categories: +- 1M ") + + +def main(conn_str, directory_path): + # Connection to PostgreSQL + with psycopg2.connect(conn_str) as conn: + with conn.cursor() as cursor: + # Run SQL statements + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + register_vector(conn) + cursor.execute("DROP TABLE IF EXISTS documents;") + cursor.execute( + """ + CREATE TABLE documents ( + _id TEXT PRIMARY KEY, + title TEXT, + text TEXT, + embeddings vector(1536) -- text-embedding-3-large-1536-embedding (OpenAI) + ); + """ + ) + conn.commit() + + # List and sort Parquet files + parquet_files = sorted(Path(directory_path).glob("*.parquet")) + + for file in parquet_files: + print(f"Loading {file} into PostgreSQL") + df = pd.read_parquet(file) + + print(df.head()) + + data_list = [ + ( + row["_id"], + row["title"], + row["text"], + np.array(row["text-embedding-3-large-1536-embedding"]), + ) + for index, row in df.iterrows() + ] + # Use execute_values to perform batch insertion + execute_values( + cursor, + "INSERT INTO documents (_id, title, text, embeddings) VALUES %s", + data_list, + ) + # Commit after we insert all embeddings + conn.commit() + + print(f"Loaded {file} into PostgreSQL") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print_usage() + sys.exit(1) + + conn_str = sys.argv[1] + directory_path = sys.argv[2] + main(conn_str, directory_path) diff --git a/test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql b/test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql new file mode 100644 index 0000000000..886ae9645b --- /dev/null +++ b/test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql @@ -0,0 +1,10 @@ +with x (x) as ( + select "embeddings" as x + from hnsw_test_table + TABLESAMPLE SYSTEM (1) + LIMIT 1 +) +SELECT title, "embeddings" <=> (select x from x) as distance +FROM hnsw_test_table +ORDER BY 2 +LIMIT 30; diff --git a/test_runner/performance/pgvector/pgbench_hnsw_queries.sql b/test_runner/performance/pgvector/pgbench_hnsw_queries.sql new file mode 100644 index 0000000000..5034063c1b --- /dev/null +++ b/test_runner/performance/pgvector/pgbench_hnsw_queries.sql @@ -0,0 +1,13 @@ +-- run with pooled connection +-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require" + +with x (x) as ( + select "embeddings" as x + from hnsw_test_table + TABLESAMPLE SYSTEM (1) + LIMIT 1 +) +SELECT title, "embeddings" <=> (select x from x) as distance +FROM hnsw_test_table +ORDER BY 2 +LIMIT 30; diff --git a/test_runner/performance/test_perf_olap.py b/test_runner/performance/test_perf_olap.py index 8a9509ea44..2367676e67 100644 --- a/test_runner/performance/test_perf_olap.py +++ b/test_runner/performance/test_perf_olap.py @@ -100,6 +100,25 @@ QUERIES: Tuple[LabelledQuery, ...] = ( ) # fmt: on +# A list of pgvector HNSW index builds to run. +# Please do not alter the label for the query, as it is used to identify it. +# +# Disable auto formatting for the list of queries so that it's easier to read +# fmt: off +PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = ( + LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"), + LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"), + LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"), + LabelledQuery("PGV3", r"CREATE INDEX ON hnsw_test_table (_id);"), + LabelledQuery("PGV4", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);"), + LabelledQuery("PGV5", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);"), + LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"), + LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"), + LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"), +) +# fmt: on + + EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)" @@ -245,3 +264,18 @@ def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare): log.info("Collecting pg_stat_statements") query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;") run_psql(remote_compare, query, times=1, explain=False) + + +@pytest.mark.parametrize("query", PGVECTOR_QUERIES) +@pytest.mark.remote_cluster +def test_pgvector_indexing(query: LabelledQuery, remote_compare: RemoteCompare): + """ + An pgvector test that tests HNSW index build performance and parallelism. + + The DB prepared manually in advance. + See + - test_runner/performance/pgvector/README.md + - test_runner/performance/pgvector/loaddata.py + - test_runner/performance/pgvector/HNSW_build.sql + """ + run_psql(remote_compare, query, times=1, explain=False) diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index 2b8760dff2..d756d6eeca 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -17,6 +17,7 @@ class PgBenchLoadType(enum.Enum): INIT = "init" SIMPLE_UPDATE = "simple-update" SELECT_ONLY = "select-only" + PGVECTOR_HNSW = "pgvector-hnsw" def utc_now_timestamp() -> int: @@ -132,6 +133,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P password=password, ) + if workload_type == PgBenchLoadType.PGVECTOR_HNSW: + # Run simple-update workload + run_pgbench( + env, + "pgvector-hnsw", + [ + "pgbench", + "-f", + "test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql", + "-c100", + "-j20", + f"-T{duration}", + "-P2", + "--protocol=prepared", + "--progress-timestamp", + connstr, + ], + password=password, + ) + env.report_size() @@ -201,3 +222,13 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur @pytest.mark.remote_cluster def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int): run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY) + + +# The following test runs on an existing database that has pgvector extension installed +# and a table with 1 million embedding vectors loaded and indexed with HNSW. +# +# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup. +@pytest.mark.parametrize("duration", get_durations_matrix()) +@pytest.mark.remote_cluster +def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int): + run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW) diff --git a/test_runner/regress/test_gin_redo.py b/test_runner/regress/test_gin_redo.py new file mode 100644 index 0000000000..9205882239 --- /dev/null +++ b/test_runner/regress/test_gin_redo.py @@ -0,0 +1,22 @@ +import time + +from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup + + +# +# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error +# +def test_gin_redo(neon_simple_env: NeonEnv): + env = neon_simple_env + + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + time.sleep(1) + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + con = primary.connect() + cur = con.cursor() + cur.execute("create table gin_test_tbl(id integer, i int4[])") + cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)") + cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g") + cur.execute("delete from gin_test_tbl where id % 2 = 0") + cur.execute("vacuum gin_test_tbl") + wait_replica_caughtup(primary, secondary) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ea66eeff63..cff13e74ee 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -23,7 +23,6 @@ from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import ( Endpoint, - NeonEnv, NeonEnvBuilder, NeonPageserver, PgBin, @@ -48,7 +47,7 @@ from fixtures.remote_storage import ( ) from fixtures.safekeeper.http import SafekeeperHttpClient from fixtures.safekeeper.utils import are_walreceivers_absent -from fixtures.utils import get_dir_size, query_scalar, start_in_background +from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background def wait_lsn_force_checkpoint( @@ -360,7 +359,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # We will wait for first segment removal. Make sure they exist for starter. first_segments = [ - os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001") + sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001" for sk in env.safekeepers ] assert all(os.path.exists(p) for p in first_segments) @@ -445,7 +444,7 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb): http_cli = sk.http_client() tli_status = http_cli.timeline_status(tenant_id, timeline_id) - sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id))) + sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id)) sk_wal_size_mb = sk_wal_size / 1024 / 1024 log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}") return sk_wal_size_mb <= target_size_mb @@ -591,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): # save the last (partial) file to put it back after recreation; others will be fetched from s3 sk = env.safekeepers[0] - tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) + tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id) f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0]) f_partial_path = tli_dir / f_partial - f_partial_saved = Path(sk.data_dir()) / f_partial.name + f_partial_saved = Path(sk.data_dir) / f_partial.name f_partial_path.rename(f_partial_saved) pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version @@ -616,7 +615,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( - Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name + Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name ) shutil.copy(f_partial_saved, f_partial_path) @@ -1132,8 +1131,8 @@ def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: Timeline ) for f in mismatch: - f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f) - f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f) + f1 = sk0.timeline_dir(tenant_id, timeline_id) / f + f2 = sk.timeline_dir(tenant_id, timeline_id) / f stdout_filename = f"{f2}.filediff" with open(stdout_filename, "w") as stdout_f: @@ -1631,7 +1630,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with conn.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key)") sk = env.safekeepers[0] - sk_data_dir = Path(sk.data_dir()) + sk_data_dir = sk.data_dir if not auth_enabled: sk_http = sk.http_client() sk_http_other = sk_http @@ -1724,9 +1723,6 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): def test_pull_timeline(neon_env_builder: NeonEnvBuilder): - def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: - return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) - def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1812,6 +1808,65 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): show_statuses(env.safekeepers, tenant_id, timeline_id) +# Test pull_timeline while concurrently gc'ing WAL on safekeeper: +# 1) Start pull_timeline, listing files to fetch. +# 2) Write segment, do gc. +# 3) Finish pull_timeline. +# 4) Do some write, verify integrity with timeline_digest. +# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL +# segment is not implemented. +@pytest.mark.xfail +def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + env = neon_env_builder.init_start() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + + log.info("use only first 2 safekeepers, 3rd will be seeded") + endpoint = env.endpoints.create("main") + endpoint.active_safekeepers = [1, 2] + endpoint.start() + endpoint.safe_psql("create table t(key int, value text)") + endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'") + + src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}") + + dst_http = dst_sk.http_client() + # run pull_timeline which will halt before downloading files + dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause")) + pt_handle = PropagatingThread( + target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) + ) + pt_handle.start() + dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable") + + # ensure segment exists + endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'") + lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + assert lsn > Lsn("0/2000000") + # Checkpoint timeline beyond lsn. + src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn) + first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001" + log.info(f"first segment exist={os.path.exists(first_segment_p)}") + + dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off")) + pt_handle.join() + + timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id) + dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}") + assert dst_flush_lsn >= src_flush_lsn + digests = [ + sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn) + for sk in [src_sk, dst_sk] + ] + assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}" + + # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries # when compute is active, but there are no writes to the timeline. In that case # pageserver should maintain a single connection to safekeeper and don't attempt diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index b5d86de574..715d22eed8 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -531,6 +531,64 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): asyncio.run(run_recovery_uncommitted(env)) +async def run_wal_truncation(env: NeonEnv): + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + (sk1, sk2, sk3) = env.safekeepers + + ep = env.endpoints.create_start("main") + ep.safe_psql("create table t (key int, value text)") + ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") + + # insert with only one sk3 up to create tail of flushed but not committed WAL on it + sk1.stop() + sk2.stop() + conn = await ep.connect_async() + # query should hang, so execute in separate task + bg_query = asyncio.create_task( + conn.execute("insert into t select generate_series(1, 180000), 'Papaya'") + ) + sleep_sec = 2 + await asyncio.sleep(sleep_sec) + # it must still be not finished + assert not bg_query.done() + # note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers. + ep.stop_and_destroy() + + # stop sk3 as well + sk3.stop() + + # now start sk1 and sk2 and make them commit something + sk1.start() + sk2.start() + ep = env.endpoints.create_start( + "main", + ) + ep.safe_psql("insert into t select generate_series(1, 200), 'payload'") + + # start sk3 and wait for it to catch up + sk3.start() + flush_lsn = Lsn(ep.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()")) + await wait_for_lsn(sk3, tenant_id, timeline_id, flush_lsn) + + timeline_start_lsn = sk1.get_timeline_start_lsn(tenant_id, timeline_id) + digests = [ + sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, flush_lsn) + for sk in [sk1, sk2] + ] + assert digests[0] == digests[1], f"digest on sk1 is {digests[0]} but on sk3 is {digests[1]}" + + +# Simple deterministic test creating tail of WAL on safekeeper which is +# truncated when majority without this sk elects walproposer starting earlier. +def test_wal_truncation(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + asyncio.run(run_wal_truncation(env)) + + async def run_segment_init_failure(env: NeonEnv): env.neon_cli.create_branch("test_segment_init_failure") ep = env.endpoints.create_start("test_segment_init_failure") diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 0d30e28f74..17e0f5ff4e 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f +Subproject commit 17e0f5ff4e1905691aa40e1e08f9b79b14c99652 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 74fb144890..c2c3d40534 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d +Subproject commit c2c3d40534db97d83dd7e185d1971e707fa2f445 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 3c2b9d576c..b228f20372 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 3c2b9d576c580e0b5b7108001f959b8c5b42e0a2 +Subproject commit b228f20372ebcabfd7946647cb7adbd38bacb14a diff --git a/vendor/revisions.json b/vendor/revisions.json index 2f16f334c5..5bf4e289ef 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"], - "v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"], - "v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"] + "v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"], + "v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"], + "v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"] } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index f364a6c2e0..df16c71789 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -59,7 +59,7 @@ regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] } -reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] } +reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive"] } @@ -68,7 +68,7 @@ sha2 = { version = "0.10", features = ["asm"] } smallvec = { version = "1", default-features = false, features = ["const_new", "write"] } subtle = { version = "2" } sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] } -time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } +time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } tokio-rustls = { version = "0.24" } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }