mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
Compare commits
14 Commits
release-56
...
drop_buffe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
890143fcfc | ||
|
|
ab706fc88d | ||
|
|
9a081c230f | ||
|
|
fddd11dd1a | ||
|
|
238fa47bee | ||
|
|
b0a954bde2 | ||
|
|
7ac11d3942 | ||
|
|
c8cebecabf | ||
|
|
14df69d0e3 | ||
|
|
352b08d0be | ||
|
|
f9f69a2ee7 | ||
|
|
fabeff822f | ||
|
|
4a0ce9512b | ||
|
|
d61e924103 |
14
.github/workflows/actionlint.yml
vendored
14
.github/workflows/actionlint.yml
vendored
@@ -24,7 +24,7 @@ jobs:
|
||||
|
||||
actionlint:
|
||||
needs: [ check-permissions ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: reviewdog/action-actionlint@v1
|
||||
@@ -36,3 +36,15 @@ jobs:
|
||||
fail_on_error: true
|
||||
filter_mode: nofilter
|
||||
level: error
|
||||
- run: |
|
||||
PAT='^\s*runs-on:.*-latest'
|
||||
if grep -ERq $PAT .github/workflows
|
||||
then
|
||||
grep -ERl $PAT .github/workflows |\
|
||||
while read -r f
|
||||
do
|
||||
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
|
||||
echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead."
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
6
.github/workflows/approved-for-ci-run.yml
vendored
6
.github/workflows/approved-for-ci-run.yml
vendored
@@ -44,7 +44,7 @@ jobs:
|
||||
contains(fromJSON('["opened", "synchronize", "reopened", "closed"]'), github.event.action) &&
|
||||
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run"
|
||||
@@ -60,7 +60,7 @@ jobs:
|
||||
github.event.action == 'labeled' &&
|
||||
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run"
|
||||
@@ -109,7 +109,7 @@ jobs:
|
||||
github.event.action == 'closed' &&
|
||||
github.event.pull_request.head.repo.full_name != github.repository
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- name: Close PR and delete `ci-run/pr-${{ env.PR_NUMBER }}` branch
|
||||
|
||||
102
.github/workflows/benchmarking.yml
vendored
102
.github/workflows/benchmarking.yml
vendored
@@ -38,6 +38,11 @@ on:
|
||||
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
|
||||
required: false
|
||||
default: false
|
||||
run_only_pgvector_tests:
|
||||
type: boolean
|
||||
description: 'Run pgvector tests but no other tests. If not set, all tests including pgvector tests will be run'
|
||||
required: false
|
||||
default: false
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -50,6 +55,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
|
||||
@@ -120,6 +126,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
generate-matrices:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
|
||||
#
|
||||
# Available platforms:
|
||||
@@ -130,7 +137,7 @@ jobs:
|
||||
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
|
||||
env:
|
||||
RUN_AWS_RDS_AND_AURORA: ${{ github.event.inputs.run_AWS_RDS_AND_AURORA || 'false' }}
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
|
||||
olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }}
|
||||
@@ -197,6 +204,7 @@ jobs:
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
pgbench-compare:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
needs: [ generate-matrices ]
|
||||
|
||||
strategy:
|
||||
@@ -343,6 +351,92 @@ jobs:
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
pgbench-pgvector:
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "1"
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 16
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: "neon-captest-pgvector"
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERIES=("SELECT version()")
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: Benchmark pgvector hnsw indexing
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance/test_perf_olap.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing neon-captest-pgvector: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
|
||||
clickbench-compare:
|
||||
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
|
||||
# we use for performance testing in pgbench-compare.
|
||||
@@ -351,7 +445,7 @@ jobs:
|
||||
#
|
||||
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
|
||||
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
|
||||
if: ${{ !cancelled() }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
needs: [ generate-matrices, pgbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -455,7 +549,7 @@ jobs:
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
|
||||
if: ${{ !cancelled() }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
needs: [ generate-matrices, clickbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -557,7 +651,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
user-examples-compare:
|
||||
if: ${{ !cancelled() }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
needs: [ generate-matrices, tpch-compare ]
|
||||
|
||||
strategy:
|
||||
|
||||
@@ -88,7 +88,7 @@ jobs:
|
||||
|
||||
merge-images:
|
||||
needs: [ build-image ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
env:
|
||||
IMAGE_TAG: ${{ inputs.image-tag }}
|
||||
|
||||
12
.github/workflows/build_and_test.yml
vendored
12
.github/workflows/build_and_test.yml
vendored
@@ -35,7 +35,7 @@ jobs:
|
||||
cancel-previous-e2e-tests:
|
||||
needs: [ check-permissions ]
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- name: Cancel previous e2e-tests runs for this PR
|
||||
@@ -549,7 +549,7 @@ jobs:
|
||||
report-benchmarks-failures:
|
||||
needs: [ benchmarks, create-test-report ]
|
||||
if: github.ref_name == 'main' && failure() && needs.benchmarks.result == 'failure'
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- uses: slackapi/slack-github-action@v1
|
||||
@@ -774,7 +774,7 @@ jobs:
|
||||
|
||||
neon-image:
|
||||
needs: [ neon-image-arch, tag ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- uses: docker/login-action@v3
|
||||
@@ -884,7 +884,7 @@ jobs:
|
||||
|
||||
compute-node-image:
|
||||
needs: [ compute-node-image-arch, tag ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -1032,7 +1032,7 @@ jobs:
|
||||
|
||||
promote-images:
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
env:
|
||||
VERSIONS: v14 v15 v16
|
||||
@@ -1077,7 +1077,7 @@ jobs:
|
||||
|
||||
trigger-custom-extensions-build-and-wait:
|
||||
needs: [ check-permissions, tag ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Set PR's status to pending and request a remote CI test
|
||||
run: |
|
||||
|
||||
@@ -19,7 +19,7 @@ permissions: {}
|
||||
|
||||
jobs:
|
||||
check-image:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
tag: ${{ steps.get-build-tools-tag.outputs.image-tag }}
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
|
||||
2
.github/workflows/check-permissions.yml
vendored
2
.github/workflows/check-permissions.yml
vendored
@@ -16,7 +16,7 @@ permissions: {}
|
||||
|
||||
jobs:
|
||||
check-permissions:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Disallow CI runs on PRs from forks
|
||||
if: |
|
||||
|
||||
@@ -9,7 +9,7 @@ on:
|
||||
|
||||
jobs:
|
||||
cleanup:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Cleanup
|
||||
run: |
|
||||
|
||||
2
.github/workflows/pg_clients.yml
vendored
2
.github/workflows/pg_clients.yml
vendored
@@ -20,7 +20,7 @@ concurrency:
|
||||
jobs:
|
||||
test-postgres-client-libs:
|
||||
# TODO: switch to gen2 runner, requires docker
|
||||
runs-on: [ ubuntu-latest ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
env:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
|
||||
2
.github/workflows/pin-build-tools-image.yml
vendored
2
.github/workflows/pin-build-tools-image.yml
vendored
@@ -26,7 +26,7 @@ permissions: {}
|
||||
|
||||
jobs:
|
||||
tag-image:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
env:
|
||||
FROM_TAG: ${{ inputs.from-tag }}
|
||||
|
||||
2
.github/workflows/release-notify.yml
vendored
2
.github/workflows/release-notify.yml
vendored
@@ -19,7 +19,7 @@ on:
|
||||
|
||||
jobs:
|
||||
notify:
|
||||
runs-on: [ ubuntu-latest ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- uses: neondatabase/dev-actions/release-pr-notify@main
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -26,7 +26,7 @@ defaults:
|
||||
jobs:
|
||||
create-storage-release-branch:
|
||||
if: ${{ github.event.schedule == '0 6 * * MON' || format('{0}', inputs.create-storage-release-branch) == 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
permissions:
|
||||
contents: write # for `git push`
|
||||
@@ -65,7 +65,7 @@ jobs:
|
||||
|
||||
create-proxy-release-branch:
|
||||
if: ${{ github.event.schedule == '0 6 * * THU' || format('{0}', inputs.create-proxy-release-branch) == 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
permissions:
|
||||
contents: write # for `git push`
|
||||
|
||||
6
.github/workflows/trigger-e2e-tests.yml
vendored
6
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -19,7 +19,7 @@ env:
|
||||
jobs:
|
||||
cancel-previous-e2e-tests:
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- name: Cancel previous e2e-tests runs for this PR
|
||||
@@ -31,7 +31,7 @@ jobs:
|
||||
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
|
||||
|
||||
tag:
|
||||
runs-on: [ ubuntu-latest ]
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
build-tag: ${{ steps.build-tag.outputs.tag }}
|
||||
|
||||
@@ -62,7 +62,7 @@ jobs:
|
||||
|
||||
trigger-e2e-tests:
|
||||
needs: [ tag ]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
env:
|
||||
TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
steps:
|
||||
|
||||
156
Cargo.lock
generated
156
Cargo.lock
generated
@@ -776,7 +776,6 @@ dependencies = [
|
||||
"pin-project",
|
||||
"serde",
|
||||
"time",
|
||||
"tz-rs",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
@@ -1291,12 +1290,6 @@ dependencies = [
|
||||
"tiny-keccak",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_fn"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935"
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1976,21 +1969,6 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.1.0"
|
||||
@@ -2620,19 +2598,6 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hyper 0.14.26",
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.3"
|
||||
@@ -3168,24 +3133,6 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.25.1"
|
||||
@@ -3356,15 +3303,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_threads"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "oauth2"
|
||||
version = "4.4.2"
|
||||
@@ -3414,50 +3352,12 @@ version = "11.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.20.0"
|
||||
@@ -4105,17 +4005,6 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
@@ -4224,6 +4113,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"itertools",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"rand 0.8.5",
|
||||
@@ -4413,6 +4303,7 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"http-body-util",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"hyper 1.2.0",
|
||||
"hyper-util",
|
||||
@@ -4423,7 +4314,6 @@ dependencies = [
|
||||
"md5",
|
||||
"measured",
|
||||
"metrics",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -4431,7 +4321,6 @@ dependencies = [
|
||||
"parquet_derive",
|
||||
"pbkdf2",
|
||||
"pin-project-lite",
|
||||
"postgres-native-tls",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
@@ -4450,6 +4339,7 @@ dependencies = [
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
@@ -4479,7 +4369,6 @@ dependencies = [
|
||||
"utils",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
"webpki-roots 0.25.2",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
]
|
||||
@@ -4786,20 +4675,21 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"http-body 0.4.5",
|
||||
"hyper 0.14.26",
|
||||
"hyper-tls",
|
||||
"hyper-rustls 0.24.0",
|
||||
"ipnet",
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.11",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
"url",
|
||||
@@ -4807,6 +4697,7 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams 0.3.0",
|
||||
"web-sys",
|
||||
"webpki-roots 0.25.2",
|
||||
"winreg 0.50.0",
|
||||
]
|
||||
|
||||
@@ -5232,20 +5123,22 @@ dependencies = [
|
||||
"hex",
|
||||
"histogram",
|
||||
"itertools",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"postgres-native-tls",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -6189,8 +6082,6 @@ checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"num_threads",
|
||||
"serde",
|
||||
"time-core",
|
||||
"time-macros",
|
||||
@@ -6300,16 +6191,6 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
@@ -6716,15 +6597,6 @@ version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
|
||||
|
||||
[[package]]
|
||||
name = "tz-rs"
|
||||
version = "0.6.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33851b15c848fad2cf4b105c6bb66eb9512b6f6c44a4b13f57c53c73c707e2b4"
|
||||
dependencies = [
|
||||
"const_fn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uname"
|
||||
version = "0.1.1"
|
||||
@@ -7629,9 +7501,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.6.0"
|
||||
version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
||||
dependencies = [
|
||||
"zeroize_derive",
|
||||
]
|
||||
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
azure_core = "0.19"
|
||||
azure_identity = "0.19"
|
||||
azure_storage = "0.19"
|
||||
azure_storage_blobs = "0.19"
|
||||
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -114,7 +114,6 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
@@ -191,7 +190,7 @@ url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
webpki-roots = "0.25"
|
||||
rustls-native-certs = "0.7"
|
||||
x509-parser = "0.15"
|
||||
|
||||
## TODO replace this with tracing
|
||||
@@ -200,7 +199,6 @@ log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
@@ -241,8 +239,7 @@ tonic-build = "0.9"
|
||||
|
||||
[patch.crates-io]
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
# bug fixes for UUID
|
||||
|
||||
@@ -99,6 +99,13 @@ name = "async-executor"
|
||||
[[bans.deny]]
|
||||
name = "smol"
|
||||
|
||||
[[bans.deny]]
|
||||
# We want to use rustls instead of the platform's native tls implementation.
|
||||
name = "native-tls"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "openssl"
|
||||
|
||||
# This section is considered when running `cargo deny check sources`.
|
||||
# More documentation about the 'sources' section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html
|
||||
|
||||
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
byteorder.workspace = true
|
||||
itertools.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
@@ -7,8 +7,9 @@ pub mod framed;
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
|
||||
use std::{borrow::Cow, fmt, io, str};
|
||||
|
||||
// re-export for use in utils pageserver_feedback.rs
|
||||
pub use postgres_protocol::PG_EPOCH;
|
||||
@@ -50,15 +51,37 @@ pub enum FeStartupPacket {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct StartupMessageParamsBuilder {
|
||||
params: BytesMut,
|
||||
}
|
||||
|
||||
impl StartupMessageParamsBuilder {
|
||||
/// Set parameter's value by its name.
|
||||
/// name and value must not contain a \0 byte
|
||||
pub fn insert(&mut self, name: &str, value: &str) {
|
||||
self.params.put(name.as_bytes());
|
||||
self.params.put(&b"\0"[..]);
|
||||
self.params.put(value.as_bytes());
|
||||
self.params.put(&b"\0"[..]);
|
||||
}
|
||||
|
||||
pub fn freeze(self) -> StartupMessageParams {
|
||||
StartupMessageParams {
|
||||
params: self.params.freeze(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct StartupMessageParams {
|
||||
params: HashMap<String, String>,
|
||||
params: Bytes,
|
||||
}
|
||||
|
||||
impl StartupMessageParams {
|
||||
/// Get parameter's value by its name.
|
||||
pub fn get(&self, name: &str) -> Option<&str> {
|
||||
self.params.get(name).map(|s| s.as_str())
|
||||
self.iter().find_map(|(k, v)| (k == name).then_some(v))
|
||||
}
|
||||
|
||||
/// Split command-line options according to PostgreSQL's logic,
|
||||
@@ -112,15 +135,19 @@ impl StartupMessageParams {
|
||||
|
||||
/// Iterate through key-value pairs in an arbitrary order.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
|
||||
self.params.iter().map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
let params =
|
||||
std::str::from_utf8(&self.params).expect("should be validated as utf8 already");
|
||||
params.split_terminator('\0').tuples()
|
||||
}
|
||||
|
||||
// This function is mostly useful in tests.
|
||||
#[doc(hidden)]
|
||||
pub fn new<'a, const N: usize>(pairs: [(&'a str, &'a str); N]) -> Self {
|
||||
Self {
|
||||
params: pairs.map(|(k, v)| (k.to_owned(), v.to_owned())).into(),
|
||||
let mut b = StartupMessageParamsBuilder::default();
|
||||
for (k, v) in pairs {
|
||||
b.insert(k, v)
|
||||
}
|
||||
b.freeze()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,35 +372,21 @@ impl FeStartupPacket {
|
||||
(major_version, minor_version) => {
|
||||
// StartupMessage
|
||||
|
||||
// Parse pairs of null-terminated strings (key, value).
|
||||
// See `postgres: ProcessStartupPacket, build_startup_packet`.
|
||||
let mut tokens = str::from_utf8(&msg)
|
||||
.map_err(|_e| {
|
||||
ProtocolError::BadMessage("StartupMessage params: invalid utf-8".to_owned())
|
||||
})?
|
||||
.strip_suffix('\0') // drop packet's own null
|
||||
.ok_or_else(|| {
|
||||
ProtocolError::Protocol(
|
||||
"StartupMessage params: missing null terminator".to_string(),
|
||||
)
|
||||
})?
|
||||
.split_terminator('\0');
|
||||
|
||||
let mut params = HashMap::new();
|
||||
while let Some(name) = tokens.next() {
|
||||
let value = tokens.next().ok_or_else(|| {
|
||||
ProtocolError::Protocol(
|
||||
"StartupMessage params: key without value".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
params.insert(name.to_owned(), value.to_owned());
|
||||
}
|
||||
let s = str::from_utf8(&msg).map_err(|_e| {
|
||||
ProtocolError::BadMessage("StartupMessage params: invalid utf-8".to_owned())
|
||||
})?;
|
||||
let s = s.strip_suffix('\0').ok_or_else(|| {
|
||||
ProtocolError::Protocol(
|
||||
"StartupMessage params: missing null terminator".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
FeStartupPacket::StartupMessage {
|
||||
major_version,
|
||||
minor_version,
|
||||
params: StartupMessageParams { params },
|
||||
params: StartupMessageParams {
|
||||
params: msg.slice_ref(s.as_bytes()),
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -187,6 +187,7 @@ impl SecondaryTenant {
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
tracing::info!("Evicting secondary layer");
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
|
||||
@@ -909,6 +909,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
self.skip_layer(layer);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -963,6 +964,15 @@ impl<'a> TenantDownloader<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
|
||||
fn skip_layer(&self, layer: HeatMapLayer) {
|
||||
let mut progress = self.secondary_state.progress.lock().unwrap();
|
||||
progress.layers_total = progress.layers_total.saturating_sub(1);
|
||||
progress.bytes_total = progress
|
||||
.bytes_total
|
||||
.saturating_sub(layer.metadata.file_size);
|
||||
}
|
||||
|
||||
async fn download_layer(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
@@ -1012,13 +1022,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
"Skipped downloading missing layer {}, raced with compaction/gc?",
|
||||
layer.name
|
||||
);
|
||||
|
||||
// If the layer is 404, adjust the progress statistics to reflect that we will not download it.
|
||||
let mut progress = self.secondary_state.progress.lock().unwrap();
|
||||
progress.layers_total = progress.layers_total.saturating_sub(1);
|
||||
progress.bytes_total = progress
|
||||
.bytes_total
|
||||
.saturating_sub(layer.metadata.file_size);
|
||||
self.skip_layer(layer);
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -125,13 +125,6 @@ typedef struct
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes_read;
|
||||
/*---
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_WRITABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes_write;
|
||||
} PageServer;
|
||||
|
||||
static PageServer page_servers[MAX_SHARDS];
|
||||
@@ -336,11 +329,6 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
FreeWaitEventSet(shard->wes_read);
|
||||
shard->wes_read = NULL;
|
||||
}
|
||||
if (shard->wes_write)
|
||||
{
|
||||
FreeWaitEventSet(shard->wes_write);
|
||||
shard->wes_write = NULL;
|
||||
}
|
||||
if (shard->conn)
|
||||
{
|
||||
PQfinish(shard->conn);
|
||||
@@ -436,22 +424,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
return false;
|
||||
}
|
||||
|
||||
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
|
||||
|
||||
shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
|
||||
PQsocket(shard->conn),
|
||||
NULL, NULL);
|
||||
|
||||
shard->state = PS_Connecting_Startup;
|
||||
/* fallthrough */
|
||||
}
|
||||
@@ -460,13 +432,12 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
char *pagestream_query;
|
||||
int ps_send_query_ret;
|
||||
bool connected = false;
|
||||
|
||||
int poll_result = PGRES_POLLING_WRITING;
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
|
||||
|
||||
do
|
||||
{
|
||||
WaitEvent event;
|
||||
int poll_result = PQconnectPoll(shard->conn);
|
||||
|
||||
switch (poll_result)
|
||||
{
|
||||
@@ -497,25 +468,45 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
}
|
||||
case PGRES_POLLING_READING:
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
while (true)
|
||||
{
|
||||
int rc = WaitLatchOrSocket(MyLatch,
|
||||
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
|
||||
PQsocket(shard->conn),
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc);
|
||||
if (rc & WL_LATCH_SET)
|
||||
{
|
||||
ResetLatch(MyLatch);
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
if (rc & WL_SOCKET_READABLE)
|
||||
break;
|
||||
}
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
while (true)
|
||||
{
|
||||
int rc = WaitLatchOrSocket(MyLatch,
|
||||
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
|
||||
PQsocket(shard->conn),
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc);
|
||||
if (rc & WL_LATCH_SET)
|
||||
{
|
||||
ResetLatch(MyLatch);
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
if (rc & WL_SOCKET_WRITEABLE)
|
||||
break;
|
||||
}
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
@@ -524,12 +515,22 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
poll_result = PQconnectPoll(shard->conn);
|
||||
elog(DEBUG5, "PQconnectPoll=>%d", poll_result);
|
||||
}
|
||||
while (!connected);
|
||||
|
||||
/* No more polling needed; connection succeeded */
|
||||
shard->last_connect_time = GetCurrentTimestamp();
|
||||
|
||||
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
|
||||
|
||||
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
case 2:
|
||||
|
||||
@@ -584,9 +584,9 @@ prefetch_read(PrefetchRequest *slot)
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(slot->shard_no, ERROR,
|
||||
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
|
||||
slot->status, (size_t) (void *) slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
@@ -606,8 +606,8 @@ prefetch_read(PrefetchRequest *slot)
|
||||
else
|
||||
{
|
||||
neon_shard_log(slot->shard_no, WARNING,
|
||||
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||
slot->my_ring_index,
|
||||
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||
(long)slot->my_ring_index,
|
||||
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum);
|
||||
return false;
|
||||
@@ -3064,6 +3064,24 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
/* Make the relation look permanent again */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
|
||||
|
||||
/*
|
||||
* Drop all buffers of the relation from buffer cache. They have valid contents, so it's a bit sad to throw
|
||||
* them away, but they might be marked as !BM_PERMANENT, which is no longer true and could cause
|
||||
* trouble afterwards. Also, there's a race condition with checkpoint and the mdunlink call below: The
|
||||
* checkpointer uses mdexists() to check if the buffer belongs to an unlogged relation, and then writes
|
||||
* the page to disk if it exists, but we might unlink the file in between the mdexists() and mdwrite() calls,
|
||||
* causing the write to fail.
|
||||
*/
|
||||
{
|
||||
static ForkNumber forks[] = { MAIN_FORKNUM, FSM_FORKNUM, VISIBILITYMAP_FORKNUM };
|
||||
static BlockNumber blocks[] = { 0, 0, 0 };
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
DropRelFileNodeBuffers(reln, forks, 3, blocks);
|
||||
#else
|
||||
DropRelationBuffers(reln, forks, 3, blocks);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Remove local copy */
|
||||
rinfob = InfoBFromSMgrRel(reln);
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
|
||||
@@ -38,6 +38,7 @@ hmac.workspace = true
|
||||
hostname.workspace = true
|
||||
http.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
@@ -82,6 +83,7 @@ thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
@@ -94,10 +96,8 @@ url.workspace = true
|
||||
urlencoding.workspace = true
|
||||
utils.workspace = true
|
||||
uuid.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
x509-parser.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
redis.workspace = true
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::{
|
||||
},
|
||||
stream, url,
|
||||
};
|
||||
use crate::{scram, EndpointCacheKey, EndpointId, Normalize, RoleName};
|
||||
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
|
||||
|
||||
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
|
||||
pub enum MaybeOwned<'a, T> {
|
||||
|
||||
@@ -100,6 +100,7 @@ pub(super) async fn authenticate(
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
ctx.set_dbname(db_info.dbname.into());
|
||||
ctx.set_user(db_info.user.into());
|
||||
ctx.set_project(db_info.aux.clone());
|
||||
info!("woken up a compute node");
|
||||
|
||||
@@ -11,7 +11,6 @@ use crate::{
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use std::{collections::HashSet, net::IpAddr, str::FromStr};
|
||||
use thiserror::Error;
|
||||
use tracing::{info, warn};
|
||||
@@ -96,13 +95,6 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
let get_param = |key| params.get(key).ok_or(MissingKey(key));
|
||||
let user: RoleName = get_param("user")?.into();
|
||||
|
||||
// record the values if we have them
|
||||
ctx.set_application(params.get("application_name").map(SmolStr::from));
|
||||
ctx.set_user(user.clone());
|
||||
if let Some(dbname) = params.get("database") {
|
||||
ctx.set_dbname(dbname.into());
|
||||
}
|
||||
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let endpoint_option = params
|
||||
.options_raw()
|
||||
|
||||
@@ -557,14 +557,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
limiter,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(console::locks::ApiLocks::new(
|
||||
"wake_compute_lock",
|
||||
permits,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
@@ -603,14 +603,19 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
limiter,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.connect_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)");
|
||||
info!(
|
||||
?limiter,
|
||||
shards,
|
||||
?epoch,
|
||||
"Using NodeLocks (connect_compute)"
|
||||
);
|
||||
let connect_compute_locks = console::locks::ApiLocks::new(
|
||||
"connect_compute_lock",
|
||||
permits,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
|
||||
@@ -10,11 +10,14 @@ use crate::{
|
||||
};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::{io, net::SocketAddr, time::Duration};
|
||||
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
|
||||
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -30,7 +33,7 @@ pub enum ConnectionError {
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] native_tls::Error),
|
||||
TlsError(#[from] InvalidDnsNameError),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
WakeComputeError(#[from] WakeComputeError),
|
||||
@@ -257,7 +260,7 @@ pub struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
|
||||
tokio::net::TcpStream,
|
||||
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
|
||||
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub params: std::collections::HashMap<String, String>,
|
||||
@@ -282,12 +285,23 @@ impl ConnCfg {
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(allow_self_signed_compute)
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
|
||||
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
|
||||
rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
|
||||
rustls::ClientConfig::builder().with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
host,
|
||||
)?;
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
@@ -340,6 +354,58 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
Some(options)
|
||||
}
|
||||
|
||||
fn load_certs() -> Result<Arc<rustls::RootCertStore>, io::Error> {
|
||||
let der_certs = rustls_native_certs::load_native_certs()?;
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme::*;
|
||||
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
|
||||
vec![
|
||||
ECDSA_NISTP521_SHA512,
|
||||
ECDSA_NISTP384_SHA384,
|
||||
ECDSA_NISTP256_SHA256,
|
||||
RSA_PSS_SHA512,
|
||||
RSA_PSS_SHA384,
|
||||
RSA_PSS_SHA256,
|
||||
ED25519,
|
||||
]
|
||||
}
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
auth::{self, backend::AuthRateLimiter},
|
||||
console::locks::ApiLocks,
|
||||
rate_limiter::RateBucketInfo,
|
||||
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
|
||||
scram::threadpool::ThreadPool,
|
||||
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
|
||||
Host,
|
||||
@@ -580,14 +580,18 @@ impl RetryConfig {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct ConcurrencyLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
pub permits: usize,
|
||||
#[serde(flatten)]
|
||||
pub limiter: RateLimiterConfig,
|
||||
/// Garbage collection epoch
|
||||
#[serde(deserialize_with = "humantime_serde::deserialize")]
|
||||
pub epoch: Duration,
|
||||
/// Lock timeout
|
||||
#[serde(deserialize_with = "humantime_serde::deserialize")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -596,13 +600,18 @@ impl ConcurrencyLockOptions {
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
|
||||
"shards=64,permits=10,epoch=10m,timeout=10ms";
|
||||
"shards=64,permits=100,epoch=10m,timeout=10ms";
|
||||
|
||||
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
|
||||
|
||||
/// Parse lock options passed via cmdline.
|
||||
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let options = options.trim();
|
||||
if options.starts_with('{') && options.ends_with('}') {
|
||||
return Ok(serde_json::from_str(options)?);
|
||||
}
|
||||
|
||||
let mut shards = None;
|
||||
let mut permits = None;
|
||||
let mut epoch = None;
|
||||
@@ -629,9 +638,13 @@ impl ConcurrencyLockOptions {
|
||||
shards = Some(2);
|
||||
}
|
||||
|
||||
let permits = permits.context("missing `permits`")?;
|
||||
let out = Self {
|
||||
shards: shards.context("missing `shards`")?,
|
||||
permits: permits.context("missing `permits`")?,
|
||||
limiter: RateLimiterConfig {
|
||||
algorithm: RateLimitAlgorithm::Fixed,
|
||||
initial_limit: permits,
|
||||
},
|
||||
epoch: epoch.context("missing `epoch`")?,
|
||||
timeout: timeout.context("missing `timeout`")?,
|
||||
};
|
||||
@@ -657,6 +670,8 @@ impl FromStr for ConcurrencyLockOptions {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::rate_limiter::Aimd;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -684,36 +699,68 @@ mod tests {
|
||||
fn test_parse_lock_options() -> anyhow::Result<()> {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(permits, 4);
|
||||
assert_eq!(limiter.initial_limit, 4);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(60));
|
||||
assert_eq!(timeout, Duration::from_millis(100));
|
||||
assert_eq!(shards, 16);
|
||||
assert_eq!(permits, 8);
|
||||
assert_eq!(limiter.initial_limit, 8);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
} = "permits=0".parse()?;
|
||||
assert_eq!(epoch, Duration::ZERO);
|
||||
assert_eq!(timeout, Duration::ZERO);
|
||||
assert_eq!(shards, 2);
|
||||
assert_eq!(permits, 0);
|
||||
assert_eq!(limiter.initial_limit, 0);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_lock_options() -> anyhow::Result<()> {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
} = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
|
||||
.parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(limiter.initial_limit, 44);
|
||||
assert_eq!(
|
||||
limiter.algorithm,
|
||||
RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 5,
|
||||
max: 500,
|
||||
dec: 0.9,
|
||||
inc: 10,
|
||||
utilisation: 0.8
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -15,11 +15,11 @@ use crate::{
|
||||
error::ReportableError,
|
||||
intern::ProjectIdInt,
|
||||
metrics::ApiLockMetrics,
|
||||
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
|
||||
scram, EndpointCacheKey,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use std::{hash::Hash, sync::Arc, time::Duration};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
@@ -443,8 +443,8 @@ impl ApiCaches {
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks<K> {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<K, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
node_locks: DashMap<K, Arc<DynamicLimiter>>,
|
||||
config: RateLimiterConfig,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
@@ -452,8 +452,6 @@ pub struct ApiLocks<K> {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ApiLockError {
|
||||
#[error("lock was closed")]
|
||||
AcquireError(#[from] tokio::sync::AcquireError),
|
||||
#[error("permit could not be acquired")]
|
||||
TimeoutError(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
@@ -461,7 +459,6 @@ pub enum ApiLockError {
|
||||
impl ReportableError for ApiLockError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service,
|
||||
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
|
||||
}
|
||||
}
|
||||
@@ -470,7 +467,7 @@ impl ReportableError for ApiLockError {
|
||||
impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
permits: usize,
|
||||
config: RateLimiterConfig,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
@@ -479,7 +476,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
permits,
|
||||
config,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
@@ -487,8 +484,10 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
}
|
||||
|
||||
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
if self.config.initial_limit == 0 {
|
||||
return Ok(WakeComputePermit {
|
||||
permit: Token::disabled(),
|
||||
});
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
@@ -500,24 +499,22 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.metrics.semaphores_registered.inc();
|
||||
Arc::new(Semaphore::new(self.permits))
|
||||
DynamicLimiter::new(self.config)
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
|
||||
let permit = semaphore.acquire_deadline(now + self.timeout).await;
|
||||
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
|
||||
Ok(WakeComputePermit {
|
||||
permit: Some(permit??),
|
||||
})
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self) {
|
||||
if self.permits == 0 {
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
let mut interval =
|
||||
@@ -547,12 +544,21 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
}
|
||||
|
||||
pub struct WakeComputePermit {
|
||||
// None if the lock is disabled
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
permit: Token,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub fn should_check_cache(&self) -> bool {
|
||||
self.permit.is_some()
|
||||
!self.permit.is_disabled()
|
||||
}
|
||||
pub fn release(self, outcome: Outcome) {
|
||||
self.permit.release(outcome)
|
||||
}
|
||||
pub fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
|
||||
match res {
|
||||
Ok(_) => self.release(Outcome::Success),
|
||||
Err(_) => self.release(Outcome::Overload),
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
http,
|
||||
metrics::{CacheOutcome, Metrics},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
scram, EndpointCacheKey, Normalize,
|
||||
scram, EndpointCacheKey,
|
||||
};
|
||||
use crate::{cache::Cached, context::RequestMonitoring};
|
||||
use futures::TryFutureExt;
|
||||
@@ -281,14 +281,6 @@ impl super::Api for Api {
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
// check rate limit
|
||||
if !self
|
||||
.wake_compute_endpoint_rate_limiter
|
||||
.check(user_info.endpoint.normalize().into(), 1)
|
||||
{
|
||||
return Err(WakeComputeError::TooManyConnections);
|
||||
}
|
||||
|
||||
let permit = self.locks.get_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
@@ -301,7 +293,16 @@ impl super::Api for Api {
|
||||
}
|
||||
}
|
||||
|
||||
let mut node = self.do_wake_compute(ctx, user_info).await?;
|
||||
// check rate limit
|
||||
if !self
|
||||
.wake_compute_endpoint_rate_limiter
|
||||
.check(user_info.endpoint.normalize_intern(), 1)
|
||||
{
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
return Err(WakeComputeError::TooManyConnections);
|
||||
}
|
||||
|
||||
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
|
||||
ctx.set_project(node.aux.clone());
|
||||
let cold_start_info = node.aux.cold_start_info;
|
||||
info!("woken up a compute node");
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use chrono::Utc;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use std::net::IpAddr;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -46,6 +47,7 @@ pub struct RequestMonitoring {
|
||||
pub(crate) auth_method: Option<AuthMethod>,
|
||||
success: bool,
|
||||
pub(crate) cold_start_info: ColdStartInfo,
|
||||
pg_options: Option<StartupMessageParams>,
|
||||
|
||||
// extra
|
||||
// This sender is here to keep the request monitoring channel open while requests are taking place.
|
||||
@@ -102,6 +104,7 @@ impl RequestMonitoring {
|
||||
success: false,
|
||||
rejected: None,
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
pg_options: None,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
|
||||
@@ -132,6 +135,18 @@ impl RequestMonitoring {
|
||||
self.latency_timer.cold_start_info(info);
|
||||
}
|
||||
|
||||
pub fn set_db_options(&mut self, options: StartupMessageParams) {
|
||||
self.set_application(options.get("application_name").map(SmolStr::from));
|
||||
if let Some(user) = options.get("user") {
|
||||
self.set_user(user.into());
|
||||
}
|
||||
if let Some(dbname) = options.get("database") {
|
||||
self.set_dbname(dbname.into());
|
||||
}
|
||||
|
||||
self.pg_options = Some(options);
|
||||
}
|
||||
|
||||
pub fn set_project(&mut self, x: MetricsAuxInfo) {
|
||||
if self.endpoint_id.is_none() {
|
||||
self.set_endpoint_id(x.endpoint_id.as_str().into())
|
||||
@@ -155,8 +170,10 @@ impl RequestMonitoring {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_application(&mut self, app: Option<SmolStr>) {
|
||||
self.application = app.or_else(|| self.application.clone());
|
||||
fn set_application(&mut self, app: Option<SmolStr>) {
|
||||
if let Some(app) = app {
|
||||
self.application = Some(app);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_dbname(&mut self, dbname: DbName) {
|
||||
|
||||
@@ -13,7 +13,9 @@ use parquet::{
|
||||
},
|
||||
record::RecordWriter,
|
||||
};
|
||||
use pq_proto::StartupMessageParams;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use serde::ser::SerializeMap;
|
||||
use tokio::{sync::mpsc, time};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, Span};
|
||||
@@ -87,6 +89,7 @@ pub struct RequestData {
|
||||
database: Option<String>,
|
||||
project: Option<String>,
|
||||
branch: Option<String>,
|
||||
pg_options: Option<String>,
|
||||
auth_method: Option<&'static str>,
|
||||
error: Option<&'static str>,
|
||||
/// Success is counted if we form a HTTP response with sql rows inside
|
||||
@@ -101,6 +104,23 @@ pub struct RequestData {
|
||||
disconnect_timestamp: Option<chrono::NaiveDateTime>,
|
||||
}
|
||||
|
||||
struct Options<'a> {
|
||||
options: &'a StartupMessageParams,
|
||||
}
|
||||
|
||||
impl<'a> serde::Serialize for Options<'a> {
|
||||
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let mut state = s.serialize_map(None)?;
|
||||
for (k, v) in self.options.iter() {
|
||||
state.serialize_entry(k, v)?;
|
||||
}
|
||||
state.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RequestMonitoring> for RequestData {
|
||||
fn from(value: &RequestMonitoring) -> Self {
|
||||
Self {
|
||||
@@ -113,6 +133,10 @@ impl From<&RequestMonitoring> for RequestData {
|
||||
database: value.dbname.as_deref().map(String::from),
|
||||
project: value.project.as_deref().map(String::from),
|
||||
branch: value.branch.as_deref().map(String::from),
|
||||
pg_options: value
|
||||
.pg_options
|
||||
.as_ref()
|
||||
.and_then(|options| serde_json::to_string(&Options { options }).ok()),
|
||||
auth_method: value.auth_method.as_ref().map(|x| match x {
|
||||
super::AuthMethod::Web => "web",
|
||||
super::AuthMethod::ScramSha256 => "scram_sha_256",
|
||||
@@ -494,6 +518,7 @@ mod tests {
|
||||
database: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
project: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
pg_options: None,
|
||||
auth_method: None,
|
||||
protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
|
||||
region: "us-east-1",
|
||||
@@ -570,15 +595,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315314, 3, 6000),
|
||||
(1315307, 3, 6000),
|
||||
(1315367, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(1315454, 3, 6000),
|
||||
(1315296, 3, 6000),
|
||||
(1315088, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(438713, 1, 2000)
|
||||
(1315874, 3, 6000),
|
||||
(1315867, 3, 6000),
|
||||
(1315927, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(1316014, 3, 6000),
|
||||
(1315856, 3, 6000),
|
||||
(1315648, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(438913, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -608,11 +633,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1222212, 5, 10000),
|
||||
(1228362, 5, 10000),
|
||||
(1230156, 5, 10000),
|
||||
(1229518, 5, 10000),
|
||||
(1220796, 5, 10000)
|
||||
(1223214, 5, 10000),
|
||||
(1229364, 5, 10000),
|
||||
(1231158, 5, 10000),
|
||||
(1230520, 5, 10000),
|
||||
(1221798, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -644,11 +669,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1207859, 5, 10000),
|
||||
(1207590, 5, 10000),
|
||||
(1207883, 5, 10000),
|
||||
(1207871, 5, 10000),
|
||||
(1208126, 5, 10000)
|
||||
(1208861, 5, 10000),
|
||||
(1208592, 5, 10000),
|
||||
(1208885, 5, 10000),
|
||||
(1208873, 5, 10000),
|
||||
(1209128, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -673,15 +698,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315314, 3, 6000),
|
||||
(1315307, 3, 6000),
|
||||
(1315367, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(1315454, 3, 6000),
|
||||
(1315296, 3, 6000),
|
||||
(1315088, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(438713, 1, 2000)
|
||||
(1315874, 3, 6000),
|
||||
(1315867, 3, 6000),
|
||||
(1315927, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(1316014, 3, 6000),
|
||||
(1315856, 3, 6000),
|
||||
(1315648, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(438913, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -718,7 +743,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(659462, 2, 3001), (659176, 2, 3000), (658972, 2, 2999)]
|
||||
[(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use intern::{EndpointIdInt, EndpointIdTag, InternId};
|
||||
use tokio::task::JoinError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
@@ -129,20 +130,22 @@ macro_rules! smol_str_wrapper {
|
||||
|
||||
const POOLER_SUFFIX: &str = "-pooler";
|
||||
|
||||
pub trait Normalize {
|
||||
fn normalize(&self) -> Self;
|
||||
}
|
||||
|
||||
impl<S: Clone + AsRef<str> + From<String>> Normalize for S {
|
||||
impl EndpointId {
|
||||
fn normalize(&self) -> Self {
|
||||
if self.as_ref().ends_with(POOLER_SUFFIX) {
|
||||
let mut s = self.as_ref().to_string();
|
||||
s.truncate(s.len() - POOLER_SUFFIX.len());
|
||||
s.into()
|
||||
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
|
||||
stripped.into()
|
||||
} else {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_intern(&self) -> EndpointIdInt {
|
||||
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
|
||||
EndpointIdTag::get_interner().get_or_intern(stripped)
|
||||
} else {
|
||||
self.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 90% of role name strings are 20 characters or less.
|
||||
|
||||
@@ -267,6 +267,8 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
let hostname = mode.hostname(stream.get_ref());
|
||||
|
||||
let common_names = tls.map(|tls| &tls.common_names);
|
||||
|
||||
@@ -84,8 +84,8 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host()?;
|
||||
let _permit = self.locks.get_permit(&host).await?;
|
||||
node_info.connect(ctx, timeout).await
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(node_info.connect(ctx, timeout).await)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
|
||||
@@ -1,2 +1,6 @@
|
||||
mod limit_algorithm;
|
||||
mod limiter;
|
||||
pub use limit_algorithm::{
|
||||
aimd::Aimd, DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
|
||||
};
|
||||
pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};
|
||||
|
||||
275
proxy/src/rate_limiter/limit_algorithm.rs
Normal file
275
proxy/src/rate_limiter/limit_algorithm.rs
Normal file
@@ -0,0 +1,275 @@
|
||||
//! Algorithms for controlling concurrency limits.
|
||||
use parking_lot::Mutex;
|
||||
use std::{pin::pin, sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::Notify,
|
||||
time::{error::Elapsed, timeout_at, Instant},
|
||||
};
|
||||
|
||||
use self::aimd::Aimd;
|
||||
|
||||
pub mod aimd;
|
||||
|
||||
/// Whether a job succeeded or failed as a result of congestion/overload.
|
||||
///
|
||||
/// Errors not considered to be caused by overload should be ignored.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum Outcome {
|
||||
/// The job succeeded, or failed in a way unrelated to overload.
|
||||
Success,
|
||||
/// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
|
||||
/// was observed.
|
||||
Overload,
|
||||
}
|
||||
|
||||
/// An algorithm for controlling a concurrency limit.
|
||||
pub trait LimitAlgorithm: Send + Sync + 'static {
|
||||
/// Update the concurrency limit in response to a new job completion.
|
||||
fn update(&self, old_limit: usize, sample: Sample) -> usize;
|
||||
}
|
||||
|
||||
/// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
|
||||
pub struct Sample {
|
||||
pub(crate) latency: Duration,
|
||||
/// Jobs in flight when the sample was taken.
|
||||
pub(crate) in_flight: usize,
|
||||
pub(crate) outcome: Outcome,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RateLimitAlgorithm {
|
||||
#[default]
|
||||
Fixed,
|
||||
Aimd {
|
||||
#[serde(flatten)]
|
||||
conf: Aimd,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct Fixed;
|
||||
|
||||
impl LimitAlgorithm for Fixed {
|
||||
fn update(&self, old_limit: usize, _sample: Sample) -> usize {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
|
||||
pub struct RateLimiterConfig {
|
||||
#[serde(flatten)]
|
||||
pub algorithm: RateLimitAlgorithm,
|
||||
pub initial_limit: usize,
|
||||
}
|
||||
|
||||
impl RateLimiterConfig {
|
||||
pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
|
||||
match self.algorithm {
|
||||
RateLimitAlgorithm::Fixed => Box::new(Fixed),
|
||||
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LimiterInner {
|
||||
alg: Box<dyn LimitAlgorithm>,
|
||||
available: usize,
|
||||
limit: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
impl LimiterInner {
|
||||
fn update(&mut self, latency: Duration, outcome: Option<Outcome>) {
|
||||
if let Some(outcome) = outcome {
|
||||
let sample = Sample {
|
||||
latency,
|
||||
in_flight: self.in_flight,
|
||||
outcome,
|
||||
};
|
||||
self.limit = self.alg.update(self.limit, sample);
|
||||
}
|
||||
}
|
||||
|
||||
fn take(&mut self, ready: &Notify) -> Option<()> {
|
||||
if self.available > 1 {
|
||||
self.available -= 1;
|
||||
self.in_flight += 1;
|
||||
|
||||
// tell the next in the queue that there is a permit ready
|
||||
if self.available > 1 {
|
||||
ready.notify_one();
|
||||
}
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the number of concurrent jobs.
|
||||
///
|
||||
/// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
|
||||
/// token once the job is finished.
|
||||
///
|
||||
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
|
||||
/// caused by overload (loss).
|
||||
pub struct DynamicLimiter {
|
||||
config: RateLimiterConfig,
|
||||
inner: Mutex<LimiterInner>,
|
||||
// to notify when a token is available
|
||||
ready: Notify,
|
||||
}
|
||||
|
||||
/// A concurrency token, required to run a job.
|
||||
///
|
||||
/// Release the token back to the [`DynamicLimiter`] after the job is complete.
|
||||
pub struct Token {
|
||||
start: Instant,
|
||||
limiter: Option<Arc<DynamicLimiter>>,
|
||||
}
|
||||
|
||||
/// A snapshot of the state of the [`DynamicLimiter`].
|
||||
///
|
||||
/// Not guaranteed to be consistent under high concurrency.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct LimiterState {
|
||||
limit: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
impl DynamicLimiter {
|
||||
/// Create a limiter with a given limit control algorithm.
|
||||
pub fn new(config: RateLimiterConfig) -> Arc<Self> {
|
||||
let ready = Notify::new();
|
||||
ready.notify_one();
|
||||
|
||||
Arc::new(Self {
|
||||
inner: Mutex::new(LimiterInner {
|
||||
alg: config.create_rate_limit_algorithm(),
|
||||
available: config.initial_limit,
|
||||
limit: config.initial_limit,
|
||||
in_flight: 0,
|
||||
}),
|
||||
ready,
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
|
||||
///
|
||||
/// Returns `None` if there are none available after `duration`.
|
||||
pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
|
||||
self.acquire_deadline(Instant::now() + duration).await
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available.
|
||||
///
|
||||
/// Returns `None` if there are none available after `deadline`.
|
||||
pub async fn acquire_deadline(self: &Arc<Self>, deadline: Instant) -> Result<Token, Elapsed> {
|
||||
if self.config.initial_limit == 0 {
|
||||
// If the rate limiter is disabled, we can always acquire a token.
|
||||
Ok(Token::disabled())
|
||||
} else {
|
||||
let mut notified = pin!(self.ready.notified());
|
||||
let mut ready = notified.as_mut().enable();
|
||||
loop {
|
||||
let mut limit = None;
|
||||
if ready {
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.take(&self.ready).is_some() {
|
||||
break Ok(Token::new(self.clone()));
|
||||
}
|
||||
limit = Some(inner.limit);
|
||||
}
|
||||
match timeout_at(deadline, notified.as_mut()).await {
|
||||
Ok(()) => ready = true,
|
||||
Err(e) => {
|
||||
let limit = limit.unwrap_or_else(|| self.inner.lock().limit);
|
||||
tracing::info!(limit, "could not acquire token in time");
|
||||
break Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the concurrency [Token], along with the outcome of the job.
|
||||
///
|
||||
/// The [Outcome] of the job, and the time taken to perform it, may be used
|
||||
/// to update the concurrency limit.
|
||||
///
|
||||
/// Set the outcome to `None` to ignore the job.
|
||||
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
|
||||
tracing::info!("outcome is {:?}", outcome);
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
inner.update(start.elapsed(), outcome);
|
||||
if inner.in_flight < inner.limit {
|
||||
inner.available = inner.limit - inner.in_flight;
|
||||
// At least 1 permit is now available
|
||||
self.ready.notify_one();
|
||||
}
|
||||
|
||||
inner.in_flight -= 1;
|
||||
}
|
||||
|
||||
/// The current state of the limiter.
|
||||
pub fn state(&self) -> LimiterState {
|
||||
let inner = self.inner.lock();
|
||||
LimiterState {
|
||||
limit: inner.limit,
|
||||
in_flight: inner.in_flight,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Token {
|
||||
fn new(limiter: Arc<DynamicLimiter>) -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
limiter: Some(limiter),
|
||||
}
|
||||
}
|
||||
pub fn disabled() -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
limiter: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_disabled(&self) -> bool {
|
||||
self.limiter.is_none()
|
||||
}
|
||||
|
||||
pub fn release(mut self, outcome: Outcome) {
|
||||
self.release_mut(Some(outcome))
|
||||
}
|
||||
|
||||
pub fn release_mut(&mut self, outcome: Option<Outcome>) {
|
||||
if let Some(limiter) = self.limiter.take() {
|
||||
limiter.release_inner(self.start, outcome);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Token {
|
||||
fn drop(&mut self) {
|
||||
self.release_mut(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl LimiterState {
|
||||
/// The current concurrency limit.
|
||||
pub fn limit(&self) -> usize {
|
||||
self.limit
|
||||
}
|
||||
/// The number of jobs in flight.
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.in_flight
|
||||
}
|
||||
}
|
||||
184
proxy/src/rate_limiter/limit_algorithm/aimd.rs
Normal file
184
proxy/src/rate_limiter/limit_algorithm/aimd.rs
Normal file
@@ -0,0 +1,184 @@
|
||||
use std::usize;
|
||||
|
||||
use super::{LimitAlgorithm, Outcome, Sample};
|
||||
|
||||
/// Loss-based congestion avoidance.
|
||||
///
|
||||
/// Additive-increase, multiplicative decrease.
|
||||
///
|
||||
/// Adds available currency when:
|
||||
/// 1. no load-based errors are observed, and
|
||||
/// 2. the utilisation of the current limit is high.
|
||||
///
|
||||
/// Reduces available concurrency by a factor when load-based errors are detected.
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
|
||||
pub struct Aimd {
|
||||
/// Minimum limit for AIMD algorithm.
|
||||
pub min: usize,
|
||||
/// Maximum limit for AIMD algorithm.
|
||||
pub max: usize,
|
||||
/// Decrease AIMD decrease by value in case of error.
|
||||
pub dec: f32,
|
||||
/// Increase AIMD increase by value in case of success.
|
||||
pub inc: usize,
|
||||
/// A threshold below which the limit won't be increased.
|
||||
pub utilisation: f32,
|
||||
}
|
||||
|
||||
impl LimitAlgorithm for Aimd {
|
||||
fn update(&self, old_limit: usize, sample: Sample) -> usize {
|
||||
use Outcome::*;
|
||||
match sample.outcome {
|
||||
Success => {
|
||||
let utilisation = sample.in_flight as f32 / old_limit as f32;
|
||||
|
||||
if utilisation > self.utilisation {
|
||||
let limit = old_limit + self.inc;
|
||||
let increased_limit = limit.clamp(self.min, self.max);
|
||||
if increased_limit > old_limit {
|
||||
tracing::info!(increased_limit, "limit increased");
|
||||
}
|
||||
|
||||
increased_limit
|
||||
} else {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
Overload => {
|
||||
let limit = old_limit as f32 * self.dec;
|
||||
|
||||
// Floor instead of round, so the limit reduces even with small numbers.
|
||||
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
|
||||
let limit = limit.floor() as usize;
|
||||
|
||||
limit.clamp(self.min, self.max)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::rate_limiter::limit_algorithm::{
|
||||
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_decrease_limit_on_overload() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 10,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.8,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
token.release(Outcome::Overload);
|
||||
|
||||
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 4,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 1,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
let _token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
let _token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
token.release(Outcome::Success);
|
||||
assert_eq!(limiter.state().limit(), 5, "success: increase");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 4,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
token.release(Outcome::Success);
|
||||
assert_eq!(
|
||||
limiter.state().limit(),
|
||||
4,
|
||||
"success: ignore when < half limit"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_not_change_limit_when_no_outcome() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 10,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
drop(token);
|
||||
assert_eq!(limiter.state().limit(), 10, "ignore");
|
||||
}
|
||||
}
|
||||
@@ -232,9 +232,9 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.connect_timeout(timeout);
|
||||
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
let res = config.connect(tokio_postgres::NoTls).await;
|
||||
drop(pause);
|
||||
drop(permit);
|
||||
let (client, connection) = permit.release_result(res)?;
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
|
||||
@@ -17,6 +17,7 @@ use hyper1::http::HeaderValue;
|
||||
use hyper1::Response;
|
||||
use hyper1::StatusCode;
|
||||
use hyper1::{HeaderMap, Request};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde_json::json;
|
||||
use serde_json::Value;
|
||||
use tokio::time;
|
||||
@@ -192,13 +193,13 @@ fn get_conn_info(
|
||||
|
||||
let mut options = Option::None;
|
||||
|
||||
let mut params = StartupMessageParamsBuilder::default();
|
||||
params.insert("user", &username);
|
||||
params.insert("database", &dbname);
|
||||
for (key, value) in pairs {
|
||||
match &*key {
|
||||
"options" => {
|
||||
options = Some(NeonOptions::parse_options_raw(&value));
|
||||
}
|
||||
"application_name" => ctx.set_application(Some(value.into())),
|
||||
_ => {}
|
||||
params.insert(&key, &value);
|
||||
if key == "options" {
|
||||
options = Some(NeonOptions::parse_options_raw(&value));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
exclude = [
|
||||
"^vendor/",
|
||||
"^target/",
|
||||
"test_runner/performance/pgvector/loaddata.py",
|
||||
]
|
||||
check_untyped_defs = true
|
||||
# Help mypy find imports when running against list of individual files.
|
||||
|
||||
@@ -22,8 +22,7 @@ serde_with.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
utils.workspace = true
|
||||
async-stream.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
@@ -31,6 +30,9 @@ tokio-util = { workspace = true }
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
camino.workspace = true
|
||||
rustls.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::{collections::HashSet, str::FromStr};
|
||||
use std::{collections::HashSet, str::FromStr, sync::Arc};
|
||||
|
||||
use aws_sdk_s3::Client;
|
||||
use futures::stream::{StreamExt, TryStreamExt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::{XLogFileName, PG_TLI};
|
||||
use serde::Serialize;
|
||||
@@ -70,9 +71,12 @@ pub async fn scan_safekeeper_metadata(
|
||||
"checking bucket {}, region {}, dump_db_table {}",
|
||||
bucket_config.bucket, bucket_config.region, dump_db_table
|
||||
);
|
||||
// Use the native TLS implementation (Neon requires TLS)
|
||||
let tls_connector =
|
||||
postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
|
||||
// Use rustls (Neon requires TLS)
|
||||
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
|
||||
let client_config = rustls::ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
@@ -234,3 +238,11 @@ async fn check_timeline(
|
||||
is_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
|
||||
let der_certs = rustls_native_certs::load_native_certs()?;
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
47
test_runner/performance/pgvector/HNSW_build.sql
Normal file
47
test_runner/performance/pgvector/HNSW_build.sql
Normal file
@@ -0,0 +1,47 @@
|
||||
|
||||
\set ECHO queries
|
||||
\timing
|
||||
|
||||
-- prepare test table
|
||||
DROP TABLE IF EXISTS hnsw_test_table;
|
||||
CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;
|
||||
INSERT INTO hnsw_test_table SELECT * FROM documents;
|
||||
CREATE INDEX ON hnsw_test_table (_id); -- needed later for random tuple queries
|
||||
-- tune index build params
|
||||
SET max_parallel_maintenance_workers = 7;
|
||||
SET maintenance_work_mem = '8GB';
|
||||
-- create HNSW index for the supported distance metrics
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);
|
||||
-- note: in a second psql session we can monitor the progress of the index build phases using
|
||||
-- the following query:
|
||||
-- SELECT phase, round(100.0 * blocks_done / nullif(blocks_total, 0), 1) AS "%" FROM pg_stat_progress_create_index;
|
||||
|
||||
-- show all indexes built on the table
|
||||
SELECT
|
||||
idx.relname AS index_name,
|
||||
tbl.relname AS table_name,
|
||||
am.amname AS access_method,
|
||||
a.attname AS column_name,
|
||||
opc.opcname AS operator_class
|
||||
FROM
|
||||
pg_index i
|
||||
JOIN
|
||||
pg_class idx ON idx.oid = i.indexrelid
|
||||
JOIN
|
||||
pg_class tbl ON tbl.oid = i.indrelid
|
||||
JOIN
|
||||
pg_am am ON am.oid = idx.relam
|
||||
JOIN
|
||||
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
|
||||
JOIN
|
||||
pg_opclass opc ON opc.oid = i.indclass[0]
|
||||
WHERE
|
||||
tbl.relname = 'hnsw_test_table'
|
||||
AND a.attname = 'embeddings';
|
||||
|
||||
-- show table sizes
|
||||
\dt+
|
||||
52
test_runner/performance/pgvector/IVFFLAT_build.sql
Normal file
52
test_runner/performance/pgvector/IVFFLAT_build.sql
Normal file
@@ -0,0 +1,52 @@
|
||||
|
||||
\set ECHO queries
|
||||
\timing
|
||||
|
||||
-- prepare test table
|
||||
DROP TABLE IF EXISTS ivfflat_test_table;
|
||||
CREATE TABLE ivfflat_test_table AS TABLE documents WITH NO DATA;
|
||||
INSERT INTO ivfflat_test_table SELECT * FROM documents;
|
||||
CREATE INDEX ON ivfflat_test_table (_id); -- needed later for random tuple queries
|
||||
-- tune index build params
|
||||
SET max_parallel_maintenance_workers = 7;
|
||||
SET maintenance_work_mem = '8GB';
|
||||
-- create ivfflat index for the supported distance metrics
|
||||
-- the formulat for lists is # rows / 1000 or sqrt(# rows) if # rows > 1 million
|
||||
-- we have 1 million embeddings of vector size 1536 in column embeddings of table documents
|
||||
-- so we use 1000 lists
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_l2_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_ip_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_cosine_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings::halfvec(1536) halfvec_l2_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table
|
||||
USING ivfflat ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops) WITH (lists = 1000);
|
||||
|
||||
\d ivfflat_test_table
|
||||
|
||||
|
||||
-- show all indexes built on the table
|
||||
SELECT
|
||||
idx.relname AS index_name,
|
||||
tbl.relname AS table_name,
|
||||
am.amname AS access_method,
|
||||
a.attname AS column_name,
|
||||
opc.opcname AS operator_class
|
||||
FROM
|
||||
pg_index i
|
||||
JOIN
|
||||
pg_class idx ON idx.oid = i.indexrelid
|
||||
JOIN
|
||||
pg_class tbl ON tbl.oid = i.indrelid
|
||||
JOIN
|
||||
pg_am am ON am.oid = idx.relam
|
||||
JOIN
|
||||
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
|
||||
JOIN
|
||||
pg_opclass opc ON opc.oid = i.indclass[0]
|
||||
WHERE
|
||||
tbl.relname = 'ivfflat_test_table'
|
||||
AND a.attname = 'embeddings';
|
||||
-- show table sizes
|
||||
\dt+
|
||||
|
||||
|
||||
55
test_runner/performance/pgvector/README.md
Normal file
55
test_runner/performance/pgvector/README.md
Normal file
@@ -0,0 +1,55 @@
|
||||
# Source of the dataset for pgvector tests
|
||||
|
||||
This readme was copied from https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
|
||||
|
||||
## Download the parquet files
|
||||
|
||||
```bash
|
||||
brew install git-lfs
|
||||
git-lfs clone https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
|
||||
```
|
||||
|
||||
## Load into postgres:
|
||||
|
||||
see loaddata.py in this directory
|
||||
|
||||
## Rest of dataset card as on huggingface
|
||||
|
||||
---
|
||||
dataset_info:
|
||||
features:
|
||||
- name: _id
|
||||
dtype: string
|
||||
- name: title
|
||||
dtype: string
|
||||
- name: text
|
||||
dtype: string
|
||||
- name: text-embedding-3-large-1536-embedding
|
||||
sequence: float64
|
||||
splits:
|
||||
- name: train
|
||||
num_bytes: 12679725776
|
||||
num_examples: 1000000
|
||||
download_size: 9551862565
|
||||
dataset_size: 12679725776
|
||||
configs:
|
||||
- config_name: default
|
||||
data_files:
|
||||
- split: train
|
||||
path: data/train-*
|
||||
license: mit
|
||||
task_categories:
|
||||
- feature-extraction
|
||||
language:
|
||||
- en
|
||||
size_categories:
|
||||
- 1M<n<10M
|
||||
---
|
||||
|
||||
|
||||
1M OpenAI Embeddings: text-embedding-3-large 1536 dimensions
|
||||
|
||||
- Created: February 2024.
|
||||
- Text used for Embedding: title (string) + text (string)
|
||||
- Embedding Model: OpenAI text-embedding-3-large
|
||||
- This dataset was generated from the first 1M entries of https://huggingface.co/datasets/BeIR/dbpedia-entity, extracted by @KShivendu_
|
||||
72
test_runner/performance/pgvector/loaddata.py
Normal file
72
test_runner/performance/pgvector/loaddata.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import psycopg2
|
||||
from pgvector.psycopg2 import register_vector
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
|
||||
def print_usage():
|
||||
print("Usage: loaddata.py <CONNSTR> <DATADIR>")
|
||||
|
||||
|
||||
def main(conn_str, directory_path):
|
||||
# Connection to PostgreSQL
|
||||
with psycopg2.connect(conn_str) as conn:
|
||||
with conn.cursor() as cursor:
|
||||
# Run SQL statements
|
||||
cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;")
|
||||
register_vector(conn)
|
||||
cursor.execute("DROP TABLE IF EXISTS documents;")
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE documents (
|
||||
_id TEXT PRIMARY KEY,
|
||||
title TEXT,
|
||||
text TEXT,
|
||||
embeddings vector(1536) -- text-embedding-3-large-1536-embedding (OpenAI)
|
||||
);
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# List and sort Parquet files
|
||||
parquet_files = sorted(Path(directory_path).glob("*.parquet"))
|
||||
|
||||
for file in parquet_files:
|
||||
print(f"Loading {file} into PostgreSQL")
|
||||
df = pd.read_parquet(file)
|
||||
|
||||
print(df.head())
|
||||
|
||||
data_list = [
|
||||
(
|
||||
row["_id"],
|
||||
row["title"],
|
||||
row["text"],
|
||||
np.array(row["text-embedding-3-large-1536-embedding"]),
|
||||
)
|
||||
for index, row in df.iterrows()
|
||||
]
|
||||
# Use execute_values to perform batch insertion
|
||||
execute_values(
|
||||
cursor,
|
||||
"INSERT INTO documents (_id, title, text, embeddings) VALUES %s",
|
||||
data_list,
|
||||
)
|
||||
# Commit after we insert all embeddings
|
||||
conn.commit()
|
||||
|
||||
print(f"Loaded {file} into PostgreSQL")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print_usage()
|
||||
sys.exit(1)
|
||||
|
||||
conn_str = sys.argv[1]
|
||||
directory_path = sys.argv[2]
|
||||
main(conn_str, directory_path)
|
||||
@@ -0,0 +1,10 @@
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
13
test_runner/performance/pgvector/pgbench_hnsw_queries.sql
Normal file
13
test_runner/performance/pgvector/pgbench_hnsw_queries.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -100,6 +100,25 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
# A list of pgvector HNSW index builds to run.
|
||||
# Please do not alter the label for the query, as it is used to identify it.
|
||||
#
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
LabelledQuery("PGV3", r"CREATE INDEX ON hnsw_test_table (_id);"),
|
||||
LabelledQuery("PGV4", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);"),
|
||||
LabelledQuery("PGV5", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);"),
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
|
||||
|
||||
|
||||
@@ -245,3 +264,18 @@ def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Collecting pg_stat_statements")
|
||||
query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("query", PGVECTOR_QUERIES)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgvector_indexing(query: LabelledQuery, remote_compare: RemoteCompare):
|
||||
"""
|
||||
An pgvector test that tests HNSW index build performance and parallelism.
|
||||
|
||||
The DB prepared manually in advance.
|
||||
See
|
||||
- test_runner/performance/pgvector/README.md
|
||||
- test_runner/performance/pgvector/loaddata.py
|
||||
- test_runner/performance/pgvector/HNSW_build.sql
|
||||
"""
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
@@ -17,6 +17,7 @@ class PgBenchLoadType(enum.Enum):
|
||||
INIT = "init"
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -132,6 +133,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HNSW:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-hnsw",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -201,3 +222,13 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
22
test_runner/regress/test_gin_redo.py
Normal file
22
test_runner/regress/test_gin_redo.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)")
|
||||
cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g")
|
||||
cur.execute("delete from gin_test_tbl where id % 2 = 0")
|
||||
cur.execute("vacuum gin_test_tbl")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
@@ -531,6 +531,64 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
async def run_wal_truncation(env: NeonEnv):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(sk1, sk2, sk3) = env.safekeepers
|
||||
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table t (key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
# stop sk3 as well
|
||||
sk3.stop()
|
||||
|
||||
# now start sk1 and sk2 and make them commit something
|
||||
sk1.start()
|
||||
sk2.start()
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
)
|
||||
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
|
||||
|
||||
# start sk3 and wait for it to catch up
|
||||
sk3.start()
|
||||
flush_lsn = Lsn(ep.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()"))
|
||||
await wait_for_lsn(sk3, tenant_id, timeline_id, flush_lsn)
|
||||
|
||||
timeline_start_lsn = sk1.get_timeline_start_lsn(tenant_id, timeline_id)
|
||||
digests = [
|
||||
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, flush_lsn)
|
||||
for sk in [sk1, sk2]
|
||||
]
|
||||
assert digests[0] == digests[1], f"digest on sk1 is {digests[0]} but on sk3 is {digests[1]}"
|
||||
|
||||
|
||||
# Simple deterministic test creating tail of WAL on safekeeper which is
|
||||
# truncated when majority without this sk elects walproposer starting earlier.
|
||||
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_wal_truncation(env))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_segment_init_failure")
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 0d30e28f74...17e0f5ff4e
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74fb144890...c2c3d40534
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3c2b9d576c...b228f20372
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
|
||||
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
|
||||
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
|
||||
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
|
||||
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
|
||||
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
@@ -68,7 +68,7 @@ sha2 = { version = "0.10", features = ["asm"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-rustls = { version = "0.24" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
|
||||
Reference in New Issue
Block a user