mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 14:10:37 +00:00
Compare commits
1 Commits
ci-run/pr-
...
skyzh/flus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a6898f8deb |
100
.github/workflows/benchmarking.yml
vendored
100
.github/workflows/benchmarking.yml
vendored
@@ -38,11 +38,6 @@ on:
|
||||
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
|
||||
required: false
|
||||
default: false
|
||||
run_only_pgvector_tests:
|
||||
type: boolean
|
||||
description: 'Run pgvector tests but no other tests. If not set, all tests including pgvector tests will be run'
|
||||
required: false
|
||||
default: false
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -55,7 +50,6 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
|
||||
@@ -126,7 +120,6 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
generate-matrices:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
|
||||
#
|
||||
# Available platforms:
|
||||
@@ -204,7 +197,6 @@ jobs:
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
pgbench-compare:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
needs: [ generate-matrices ]
|
||||
|
||||
strategy:
|
||||
@@ -351,92 +343,6 @@ jobs:
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
pgbench-pgvector:
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "1"
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 16
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: "neon-captest-pgvector"
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERIES=("SELECT version()")
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: Benchmark pgvector hnsw indexing
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance/test_perf_olap.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing neon-captest-pgvector: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
|
||||
clickbench-compare:
|
||||
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
|
||||
# we use for performance testing in pgbench-compare.
|
||||
@@ -445,7 +351,7 @@ jobs:
|
||||
#
|
||||
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
|
||||
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, pgbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -549,7 +455,7 @@ jobs:
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, clickbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -651,7 +557,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
user-examples-compare:
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, tpch-compare ]
|
||||
|
||||
strategy:
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -53,7 +53,7 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
cat << EOF > body.md
|
||||
## Storage & Compute release ${RELEASE_DATE}
|
||||
## Release ${RELEASE_DATE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
155
Cargo.lock
generated
155
Cargo.lock
generated
@@ -776,6 +776,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"serde",
|
||||
"time",
|
||||
"tz-rs",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
@@ -1290,6 +1291,12 @@ dependencies = [
|
||||
"tiny-keccak",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_fn"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935"
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1969,6 +1976,21 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.1.0"
|
||||
@@ -2598,6 +2620,19 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hyper 0.14.26",
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.3"
|
||||
@@ -3133,6 +3168,24 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.25.1"
|
||||
@@ -3303,6 +3356,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_threads"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "oauth2"
|
||||
version = "4.4.2"
|
||||
@@ -3352,12 +3414,50 @@ version = "11.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.20.0"
|
||||
@@ -4005,6 +4105,17 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
@@ -4302,7 +4413,6 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"http-body-util",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"hyper 1.2.0",
|
||||
"hyper-util",
|
||||
@@ -4313,6 +4423,7 @@ dependencies = [
|
||||
"md5",
|
||||
"measured",
|
||||
"metrics",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -4320,6 +4431,7 @@ dependencies = [
|
||||
"parquet_derive",
|
||||
"pbkdf2",
|
||||
"pin-project-lite",
|
||||
"postgres-native-tls",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
@@ -4338,7 +4450,6 @@ dependencies = [
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
@@ -4368,6 +4479,7 @@ dependencies = [
|
||||
"utils",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
"webpki-roots 0.25.2",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
]
|
||||
@@ -4674,21 +4786,20 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"http-body 0.4.5",
|
||||
"hyper 0.14.26",
|
||||
"hyper-rustls 0.24.0",
|
||||
"hyper-tls",
|
||||
"ipnet",
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.11",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-native-tls",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
"url",
|
||||
@@ -4696,7 +4807,6 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams 0.3.0",
|
||||
"web-sys",
|
||||
"webpki-roots 0.25.2",
|
||||
"winreg 0.50.0",
|
||||
]
|
||||
|
||||
@@ -5122,22 +5232,20 @@ dependencies = [
|
||||
"hex",
|
||||
"histogram",
|
||||
"itertools",
|
||||
"once_cell",
|
||||
"native-tls",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"postgres-native-tls",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -6081,6 +6189,8 @@ checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"num_threads",
|
||||
"serde",
|
||||
"time-core",
|
||||
"time-macros",
|
||||
@@ -6190,6 +6300,16 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
@@ -6596,6 +6716,15 @@ version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
|
||||
|
||||
[[package]]
|
||||
name = "tz-rs"
|
||||
version = "0.6.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33851b15c848fad2cf4b105c6bb66eb9512b6f6c44a4b13f57c53c73c707e2b4"
|
||||
dependencies = [
|
||||
"const_fn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uname"
|
||||
version = "0.1.1"
|
||||
@@ -7500,9 +7629,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.7.0"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
||||
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
dependencies = [
|
||||
"zeroize_derive",
|
||||
]
|
||||
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_core = "0.19"
|
||||
azure_identity = "0.19"
|
||||
azure_storage = "0.19"
|
||||
azure_storage_blobs = "0.19"
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -114,6 +114,7 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
@@ -190,7 +191,7 @@ url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
rustls-native-certs = "0.7"
|
||||
webpki-roots = "0.25"
|
||||
x509-parser = "0.15"
|
||||
|
||||
## TODO replace this with tracing
|
||||
@@ -199,6 +200,7 @@ log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
@@ -239,7 +241,8 @@ tonic-build = "0.9"
|
||||
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
# bug fixes for UUID
|
||||
|
||||
@@ -99,13 +99,6 @@ name = "async-executor"
|
||||
[[bans.deny]]
|
||||
name = "smol"
|
||||
|
||||
[[bans.deny]]
|
||||
# We want to use rustls instead of the platform's native tls implementation.
|
||||
name = "native-tls"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "openssl"
|
||||
|
||||
# This section is considered when running `cargo deny check sources`.
|
||||
# More documentation about the 'sources' section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html
|
||||
|
||||
@@ -29,6 +29,7 @@ use http_types::{StatusCode, Url};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::RemoteStorageActivity;
|
||||
use crate::{
|
||||
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
|
||||
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
|
||||
@@ -525,6 +526,10 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.concurrency_limiter.activity()
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
|
||||
@@ -263,6 +263,17 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
done_if_after: SystemTime,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), TimeTravelError>;
|
||||
|
||||
/// Query how busy we currently are: may be used by callers which wish to politely
|
||||
/// back off if there are already a lot of operations underway.
|
||||
fn activity(&self) -> RemoteStorageActivity;
|
||||
}
|
||||
|
||||
pub struct RemoteStorageActivity {
|
||||
pub read_available: usize,
|
||||
pub read_total: usize,
|
||||
pub write_available: usize,
|
||||
pub write_total: usize,
|
||||
}
|
||||
|
||||
/// DownloadStream is sensitive to the timeout and cancellation used with the original
|
||||
@@ -444,6 +455,15 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn activity(&self) -> RemoteStorageActivity {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.activity(),
|
||||
Self::AwsS3(s) => s.activity(),
|
||||
Self::AzureBlob(s) => s.activity(),
|
||||
Self::Unreliable(s) => s.activity(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
@@ -774,6 +794,9 @@ struct ConcurrencyLimiter {
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
write: Arc<Semaphore>,
|
||||
read: Arc<Semaphore>,
|
||||
|
||||
write_total: usize,
|
||||
read_total: usize,
|
||||
}
|
||||
|
||||
impl ConcurrencyLimiter {
|
||||
@@ -802,10 +825,21 @@ impl ConcurrencyLimiter {
|
||||
Arc::clone(self.for_kind(kind)).acquire_owned().await
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
RemoteStorageActivity {
|
||||
read_available: self.read.available_permits(),
|
||||
read_total: self.read_total,
|
||||
write_available: self.write.available_permits(),
|
||||
write_total: self.write_total,
|
||||
}
|
||||
}
|
||||
|
||||
fn new(limit: usize) -> ConcurrencyLimiter {
|
||||
Self {
|
||||
read: Arc::new(Semaphore::new(limit)),
|
||||
write: Arc::new(Semaphore::new(limit)),
|
||||
read_total: limit,
|
||||
write_total: limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
|
||||
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
@@ -605,6 +605,16 @@ impl RemoteStorage for LocalFs {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
// LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
|
||||
RemoteStorageActivity {
|
||||
read_available: 16,
|
||||
read_total: 16,
|
||||
write_available: 16,
|
||||
write_total: 16,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
|
||||
@@ -47,8 +47,8 @@ use utils::backoff;
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
|
||||
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
|
||||
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config,
|
||||
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
pub(super) mod metrics;
|
||||
@@ -975,6 +975,10 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.concurrency_limiter.activity()
|
||||
}
|
||||
}
|
||||
|
||||
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
StorageMetadata, TimeTravelError,
|
||||
RemoteStorageActivity, StorageMetadata, TimeTravelError,
|
||||
};
|
||||
|
||||
pub struct UnreliableWrapper {
|
||||
@@ -213,4 +213,8 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.inner.activity()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,33 +9,6 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
/// Declare a failpoint that can use the `pause` failpoint action.
|
||||
/// We don't want to block the executor thread, hence, spawn_blocking + await.
|
||||
#[macro_export]
|
||||
macro_rules! pausable_failpoint {
|
||||
($name:literal) => {
|
||||
if cfg!(feature = "testing") {
|
||||
tokio::task::spawn_blocking({
|
||||
let current = tracing::Span::current();
|
||||
move || {
|
||||
let _entered = current.entered();
|
||||
tracing::info!("at failpoint {}", $name);
|
||||
fail::fail_point!($name);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("spawn_blocking");
|
||||
}
|
||||
};
|
||||
($name:literal, $cond:expr) => {
|
||||
if cfg!(feature = "testing") {
|
||||
if $cond {
|
||||
pausable_failpoint!($name)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
///
|
||||
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
|
||||
|
||||
@@ -135,8 +135,7 @@ impl Gate {
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut do_close = std::pin::pin!(self.do_close());
|
||||
|
||||
// with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
|
||||
let nag_after = Duration::from_millis(100);
|
||||
let nag_after = Duration::from_secs(1);
|
||||
|
||||
let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
|
||||
return;
|
||||
|
||||
@@ -380,8 +380,8 @@ impl interface::CompactionLayer<Key> for MockLayer {
|
||||
}
|
||||
fn file_size(&self) -> u64 {
|
||||
match self {
|
||||
MockLayer::Delta(this) => this.file_size,
|
||||
MockLayer::Image(this) => this.file_size,
|
||||
MockLayer::Delta(this) => this.file_size(),
|
||||
MockLayer::Image(this) => this.file_size(),
|
||||
}
|
||||
}
|
||||
fn short_id(&self) -> String {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -19,7 +19,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output<'a> {
|
||||
layer_metadata: &'a HashMap<LayerName, LayerFileMetadata>,
|
||||
layer_metadata: &'a HashMap<LayerName, IndexLayerMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_metadata: &'a TimelineMetadata,
|
||||
}
|
||||
|
||||
@@ -534,7 +534,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
});
|
||||
}
|
||||
EvictionLayer::Secondary(layer) => {
|
||||
let file_size = layer.metadata.file_size;
|
||||
let file_size = layer.metadata.file_size();
|
||||
|
||||
js.spawn(async move {
|
||||
layer
|
||||
@@ -641,7 +641,7 @@ impl EvictionLayer {
|
||||
pub(crate) fn get_file_size(&self) -> u64 {
|
||||
match self {
|
||||
Self::Attached(l) => l.layer_desc().file_size,
|
||||
Self::Secondary(sl) => sl.metadata.file_size,
|
||||
Self::Secondary(sl) => sl.metadata.file_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,8 +260,6 @@ async fn page_service_conn_main(
|
||||
socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
|
||||
let socket = std::pin::pin!(socket);
|
||||
|
||||
fail::fail_point!("ps::connection-start::pre-login");
|
||||
|
||||
// XXX: pgbackend.run() should take the connection_ctx,
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
@@ -605,7 +603,6 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
@@ -620,7 +617,6 @@ impl PageServerHandler {
|
||||
|
||||
let (response, span) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -630,7 +626,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -640,7 +635,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
@@ -651,7 +645,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -661,7 +654,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -1513,7 +1505,6 @@ where
|
||||
_pgb: &mut PostgresBackend<IO>,
|
||||
_sm: &FeStartupPacket,
|
||||
) -> Result<(), QueryError> {
|
||||
fail::fail_point!("ps::connection-start::startup-packet");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1528,8 +1519,6 @@ where
|
||||
Err(QueryError::SimulatedConnectionError)
|
||||
});
|
||||
|
||||
fail::fail_point!("ps::connection-start::process-query");
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string:?}");
|
||||
let parts = query_string.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
@@ -42,7 +42,6 @@ use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::failpoint_support;
|
||||
use utils::fs_ext;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::timeout::timeout_cancellable;
|
||||
@@ -123,6 +122,32 @@ use utils::{
|
||||
lsn::{Lsn, RecordLsn},
|
||||
};
|
||||
|
||||
/// Declare a failpoint that can use the `pause` failpoint action.
|
||||
/// We don't want to block the executor thread, hence, spawn_blocking + await.
|
||||
macro_rules! pausable_failpoint {
|
||||
($name:literal) => {
|
||||
if cfg!(feature = "testing") {
|
||||
tokio::task::spawn_blocking({
|
||||
let current = tracing::Span::current();
|
||||
move || {
|
||||
let _entered = current.entered();
|
||||
tracing::info!("at failpoint {}", $name);
|
||||
fail::fail_point!($name);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("spawn_blocking");
|
||||
}
|
||||
};
|
||||
($name:literal, $cond:expr) => {
|
||||
if cfg!(feature = "testing") {
|
||||
if $cond {
|
||||
pausable_failpoint!($name)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod vectored_blob_io;
|
||||
|
||||
@@ -8,7 +8,7 @@ use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, Instrument};
|
||||
|
||||
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
|
||||
@@ -197,7 +197,6 @@ pub(crate) use upload::upload_initdb_dir;
|
||||
use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::pausable_failpoint;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -1193,7 +1192,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
uploaded.local_path(),
|
||||
&remote_path,
|
||||
uploaded.metadata().file_size,
|
||||
uploaded.metadata().file_size(),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
@@ -1574,7 +1573,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
local_path,
|
||||
&remote_path,
|
||||
layer_metadata.file_size,
|
||||
layer_metadata.file_size(),
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -1769,7 +1768,7 @@ impl RemoteTimelineClient {
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
|
||||
@@ -84,7 +84,7 @@ pub async fn download_layer_file<'a>(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let expected = layer_metadata.file_size;
|
||||
let expected = layer_metadata.file_size();
|
||||
if expected != bytes_amount {
|
||||
return Err(DownloadError::Other(anyhow!(
|
||||
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
|
||||
|
||||
@@ -17,6 +17,46 @@ use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
//#[cfg_attr(test, derive(Default))]
|
||||
pub struct LayerFileMetadata {
|
||||
file_size: u64,
|
||||
|
||||
pub(crate) generation: Generation,
|
||||
|
||||
pub(crate) shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
fn from(other: &IndexLayerMetadata) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.file_size
|
||||
}
|
||||
}
|
||||
|
||||
// TODO seems like another part of the remote storage file format
|
||||
// compatibility issue, see https://github.com/neondatabase/neon/issues/3072
|
||||
/// In-memory representation of an `index_part.json` file
|
||||
///
|
||||
/// Contains the data about all files in the timeline, present remotely and its metadata.
|
||||
@@ -37,7 +77,7 @@ pub struct IndexPart {
|
||||
///
|
||||
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
|
||||
/// that latest version stores.
|
||||
pub layer_metadata: HashMap<LayerName, LayerFileMetadata>,
|
||||
pub layer_metadata: HashMap<LayerName, IndexLayerMetadata>,
|
||||
|
||||
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
|
||||
// It's duplicated for convenience when reading the serialized structure, but is
|
||||
@@ -87,7 +127,10 @@ impl IndexPart {
|
||||
lineage: Lineage,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
) -> Self {
|
||||
let layer_metadata = layers_and_metadata.clone();
|
||||
let layer_metadata = layers_and_metadata
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_owned(), IndexLayerMetadata::from(v)))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
version: Self::LATEST_VERSION,
|
||||
@@ -151,12 +194,9 @@ impl From<&UploadQueueInitialized> for IndexPart {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct LayerFileMetadata {
|
||||
/// Serialized form of [`LayerFileMetadata`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexLayerMetadata {
|
||||
pub file_size: u64,
|
||||
|
||||
#[serde(default = "Generation::none")]
|
||||
@@ -168,12 +208,12 @@ pub struct LayerFileMetadata {
|
||||
pub shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
impl From<&LayerFileMetadata> for IndexLayerMetadata {
|
||||
fn from(other: &LayerFileMetadata) -> Self {
|
||||
IndexLayerMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,12 +307,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -309,12 +349,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -352,12 +392,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 2,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -440,12 +480,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 4,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -482,12 +522,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 5,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
@@ -529,12 +569,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 6,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::time::SystemTime;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
use utils::backoff;
|
||||
|
||||
use super::Generation;
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
|
||||
@@ -187,7 +187,6 @@ impl SecondaryTenant {
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
tracing::info!("Evicting secondary layer");
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
|
||||
@@ -45,10 +45,10 @@ use crate::tenant::{
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use futures::{Future, StreamExt};
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
@@ -67,6 +67,12 @@ use super::{
|
||||
/// download, if the uploader populated it.
|
||||
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
/// Range of concurrency we may use when downloading layers within a timeline. This is independent
|
||||
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
|
||||
/// `PageServerConf::secondary_download_concurrency`
|
||||
const MAX_LAYER_CONCURRENCY: usize = 16;
|
||||
const MIN_LAYER_CONCURRENCY: usize = 1;
|
||||
|
||||
pub(super) async fn downloader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
@@ -75,14 +81,15 @@ pub(super) async fn downloader_task(
|
||||
cancel: CancellationToken,
|
||||
root_ctx: RequestContext,
|
||||
) {
|
||||
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
|
||||
// How many tenants' secondary download operations we will run concurrently
|
||||
let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
|
||||
|
||||
let generator = SecondaryDownloader {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
root_ctx,
|
||||
};
|
||||
let mut scheduler = Scheduler::new(generator, concurrency);
|
||||
let mut scheduler = Scheduler::new(generator, tenant_concurrency);
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
@@ -407,7 +414,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
tracing::warn!("Insufficient space while downloading. Will retry later.");
|
||||
}
|
||||
Err(UpdateError::Cancelled) => {
|
||||
tracing::info!("Shut down while downloading");
|
||||
tracing::debug!("Shut down while downloading");
|
||||
},
|
||||
Err(UpdateError::Deserialize(e)) => {
|
||||
tracing::error!("Corrupt content while downloading tenant: {e}");
|
||||
@@ -709,7 +716,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let mut layer_byte_count: u64 = timeline_state
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.map(|l| l.metadata.file_size())
|
||||
.sum();
|
||||
|
||||
// Remove on-disk layers that are no longer present in heatmap
|
||||
@@ -720,7 +727,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
.get(layer_file_name)
|
||||
.unwrap()
|
||||
.metadata
|
||||
.file_size;
|
||||
.file_size();
|
||||
|
||||
let local_path = local_layer_path(
|
||||
self.conf,
|
||||
@@ -841,6 +848,8 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
let mut download_futs = Vec::new();
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
@@ -877,7 +886,9 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
|
||||
if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
|
||||
|| on_disk.access_time != layer.access_time
|
||||
{
|
||||
// We already have this layer on disk. Update its access time.
|
||||
tracing::debug!(
|
||||
"Access time updated for layer {}: {} -> {}",
|
||||
@@ -909,19 +920,35 @@ impl<'a> TenantDownloader<'a> {
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
self.skip_layer(layer);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.await?
|
||||
{
|
||||
Some(layer) => touched.push(layer),
|
||||
None => {
|
||||
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
|
||||
// things to consider touched.
|
||||
download_futs.push(self.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
layer,
|
||||
ctx,
|
||||
));
|
||||
}
|
||||
|
||||
// Break up layer downloads into chunks, so that for each chunk we can re-check how much
|
||||
// concurrency to use based on activity level of remote storage.
|
||||
while !download_futs.is_empty() {
|
||||
let chunk =
|
||||
download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));
|
||||
|
||||
let concurrency = Self::layer_concurrency(self.remote_storage.activity());
|
||||
|
||||
let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
|
||||
let mut result_stream = std::pin::pin!(result_stream);
|
||||
while let Some(result) = result_stream.next().await {
|
||||
match result {
|
||||
Err(e) => return Err(e),
|
||||
Ok(None) => {
|
||||
// No error, but we didn't download the layer. Don't mark it touched
|
||||
}
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -952,7 +979,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
t.name,
|
||||
t.metadata.clone(),
|
||||
LayerFileMetadata::from(&t.metadata),
|
||||
t.access_time,
|
||||
local_path,
|
||||
));
|
||||
@@ -964,15 +991,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
|
||||
fn skip_layer(&self, layer: HeatMapLayer) {
|
||||
let mut progress = self.secondary_state.progress.lock().unwrap();
|
||||
progress.layers_total = progress.layers_total.saturating_sub(1);
|
||||
progress.bytes_total = progress
|
||||
.bytes_total
|
||||
.saturating_sub(layer.metadata.file_size);
|
||||
}
|
||||
|
||||
async fn download_layer(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
@@ -1006,7 +1024,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
*tenant_shard_id,
|
||||
*timeline_id,
|
||||
&layer.name,
|
||||
&layer.metadata,
|
||||
&LayerFileMetadata::from(&layer.metadata),
|
||||
&local_path,
|
||||
&self.secondary_state.cancel,
|
||||
ctx,
|
||||
@@ -1022,7 +1040,13 @@ impl<'a> TenantDownloader<'a> {
|
||||
"Skipped downloading missing layer {}, raced with compaction/gc?",
|
||||
layer.name
|
||||
);
|
||||
self.skip_layer(layer);
|
||||
|
||||
// If the layer is 404, adjust the progress statistics to reflect that we will not download it.
|
||||
let mut progress = self.secondary_state.progress.lock().unwrap();
|
||||
progress.layers_total = progress.layers_total.saturating_sub(1);
|
||||
progress.bytes_total = progress
|
||||
.bytes_total
|
||||
.saturating_sub(layer.metadata.file_size);
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -1059,6 +1083,19 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
Ok(Some(layer))
|
||||
}
|
||||
|
||||
/// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
|
||||
fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
|
||||
// When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
|
||||
// of our concurrency range to the units available within the remaining 25%.
|
||||
let clamp_at = (activity.read_total * 3) / 4;
|
||||
if activity.read_available > clamp_at {
|
||||
(MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
|
||||
/ (activity.read_total - clamp_at)
|
||||
} else {
|
||||
MIN_LAYER_CONCURRENCY
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
|
||||
@@ -1148,7 +1185,7 @@ async fn init_timeline_state(
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
name,
|
||||
remote_meta.metadata.clone(),
|
||||
LayerFileMetadata::from(&remote_meta.metadata),
|
||||
remote_meta.access_time,
|
||||
file_path,
|
||||
),
|
||||
@@ -1182,3 +1219,58 @@ async fn init_timeline_state(
|
||||
|
||||
detail
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn layer_concurrency() {
|
||||
// Totally idle
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 16,
|
||||
read_total: 16,
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MAX_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Totally busy
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 0,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MIN_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Edge of the range at which we interpolate
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 12,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MIN_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Midpoint of the range in which we interpolate
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 14,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MAX_LAYER_CONCURRENCY / 2
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
|
||||
use crate::tenant::{remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
|
||||
@@ -38,7 +38,7 @@ pub(crate) struct HeatMapTimeline {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(super) name: LayerName,
|
||||
pub(super) metadata: LayerFileMetadata,
|
||||
pub(super) metadata: IndexLayerMetadata,
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(super) access_time: SystemTime,
|
||||
@@ -49,7 +49,7 @@ pub(crate) struct HeatMapLayer {
|
||||
impl HeatMapLayer {
|
||||
pub(crate) fn new(
|
||||
name: LayerName,
|
||||
metadata: LayerFileMetadata,
|
||||
metadata: IndexLayerMetadata,
|
||||
access_time: SystemTime,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -47,7 +47,7 @@ use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -473,7 +473,7 @@ impl ImageLayerInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, None, ctx)
|
||||
.plan_reads(keyspace, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
@@ -485,15 +485,9 @@ impl ImageLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Traverse the layer's index to build read operations on the overlap of the input keyspace
|
||||
/// and the keys in this layer.
|
||||
///
|
||||
/// If shard_identity is provided, it will be used to filter keys down to those stored on
|
||||
/// this shard.
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
shard_identity: Option<&ShardIdentity>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(
|
||||
@@ -513,6 +507,7 @@ impl ImageLayerInner {
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
@@ -525,22 +520,12 @@ impl ImageLayerInner {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
let flag = if let Some(shard_identity) = shard_identity {
|
||||
if shard_identity.is_key_disposable(&key) {
|
||||
BlobFlag::Ignore
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
} else {
|
||||
BlobFlag::None
|
||||
};
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, flag);
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,50 +538,6 @@ impl ImageLayerInner {
|
||||
Ok(planner.finish())
|
||||
}
|
||||
|
||||
/// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
|
||||
/// then execute vectored GET operations, passing the results of all read keys into the writer.
|
||||
pub(super) async fn filter(
|
||||
&self,
|
||||
shard_identity: &ShardIdentity,
|
||||
writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<usize> {
|
||||
// Fragment the range into the regions owned by this ShardIdentity
|
||||
let plan = self
|
||||
.plan_reads(
|
||||
KeySpace {
|
||||
// If asked for the total key space, plan_reads will give us all the keys in the layer
|
||||
ranges: vec![Key::MIN..Key::MAX],
|
||||
},
|
||||
Some(shard_identity),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let mut key_count = 0;
|
||||
for read in plan.into_iter() {
|
||||
let buf_size = read.size();
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
|
||||
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
|
||||
key_count += 1;
|
||||
writer
|
||||
.put_image(meta.meta.key, img_buf, ctx)
|
||||
.await
|
||||
.context(format!("Storing key {}", meta.meta.key))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(key_count)
|
||||
}
|
||||
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
@@ -914,136 +855,3 @@ impl Drop for ImageLayerWriter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::{
|
||||
key::Key,
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
|
||||
};
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
|
||||
|
||||
use super::ImageLayerWriter;
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_layer_rewrite() {
|
||||
let harness = TenantHarness::create("test_image_layer_rewrite").unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
// The LSN at which we will create an image layer to filter
|
||||
let lsn = Lsn(0xdeadbeef0000);
|
||||
|
||||
let timeline_id = TimelineId::generate();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
|
||||
let range = input_start..input_end;
|
||||
|
||||
// Build an image layer to filter
|
||||
let resident = {
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
harness.conf,
|
||||
timeline_id,
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
writer.finish(&timeline, &ctx).await.unwrap()
|
||||
};
|
||||
let original_size = resident.metadata().file_size;
|
||||
|
||||
// Filter for various shards: this exercises cases like values at start of key range, end of key
|
||||
// range, middle of key range.
|
||||
for shard_number in 0..4 {
|
||||
let mut filtered_writer = ImageLayerWriter::new(
|
||||
harness.conf,
|
||||
timeline_id,
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// TenantHarness gave us an unsharded tenant, but we'll use a sharded ShardIdentity
|
||||
// to exercise filter()
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(shard_number),
|
||||
ShardCount::new(4),
|
||||
ShardStripeSize(0x8000),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let wrote_keys = resident
|
||||
.filter(&shard_identity, &mut filtered_writer, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let replacement = if wrote_keys > 0 {
|
||||
Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// This exact size and those below will need updating as/when the layer encoding changes, but
|
||||
// should be deterministic for a given version of the format, as we used no randomness generating the input.
|
||||
assert_eq!(original_size, 1597440);
|
||||
|
||||
match shard_number {
|
||||
0 => {
|
||||
// We should have written out just one stripe for our shard identity
|
||||
assert_eq!(wrote_keys, 0x8000);
|
||||
let replacement = replacement.unwrap();
|
||||
|
||||
// We should have dropped some of the data
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
|
||||
// Assert that we dropped ~3/4 of the data.
|
||||
assert_eq!(replacement.metadata().file_size, 417792);
|
||||
}
|
||||
1 => {
|
||||
// Shard 1 has no keys in our input range
|
||||
assert_eq!(wrote_keys, 0x0);
|
||||
assert!(replacement.is_none());
|
||||
}
|
||||
2 => {
|
||||
// Shard 2 has one stripes in the input range
|
||||
assert_eq!(wrote_keys, 0x8000);
|
||||
let replacement = replacement.unwrap();
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
assert_eq!(replacement.metadata().file_size, 417792);
|
||||
}
|
||||
3 => {
|
||||
// Shard 3 has two stripes in the input range
|
||||
assert_eq!(wrote_keys, 0x10000);
|
||||
let replacement = replacement.unwrap();
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
assert_eq!(replacement.metadata().file_size, 811008);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -23,10 +23,10 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
|
||||
|
||||
use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::image_layer;
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
|
||||
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
|
||||
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
|
||||
};
|
||||
|
||||
use utils::generation::Generation;
|
||||
@@ -161,7 +161,7 @@ impl Layer {
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size,
|
||||
metadata.file_size(),
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
@@ -194,7 +194,7 @@ impl Layer {
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size,
|
||||
metadata.file_size(),
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
|
||||
@@ -227,7 +227,7 @@ impl Layer {
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(metadata.file_size);
|
||||
.resident_physical_size_add(metadata.file_size());
|
||||
|
||||
ResidentLayer { downloaded, owner }
|
||||
}
|
||||
@@ -1802,15 +1802,16 @@ impl ResidentLayer {
|
||||
use LayerKind::*;
|
||||
|
||||
let owner = &self.owner.0;
|
||||
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
Delta(ref d) => {
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
owner
|
||||
.access_stats
|
||||
.record_access(LayerAccessKind::KeyIter, ctx);
|
||||
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
delta_layer::DeltaLayerInner::load_keys(d, ctx)
|
||||
.await
|
||||
.with_context(|| format!("Layer index is corrupted for {self}"))
|
||||
@@ -1819,23 +1820,6 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
|
||||
/// the provided writer. Return the number of keys written.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
|
||||
pub(crate) async fn filter<'a>(
|
||||
&'a self,
|
||||
shard_identity: &ShardIdentity,
|
||||
writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<usize> {
|
||||
use LayerKind::*;
|
||||
|
||||
match self.downloaded.get(&self.owner.0, ctx).await? {
|
||||
Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
|
||||
Image(i) => i.filter(shard_identity, writer, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the amount of keys and values written to the writer.
|
||||
pub(crate) async fn copy_delta_prefix(
|
||||
&self,
|
||||
|
||||
@@ -17,7 +17,7 @@ use crate::tenant::{Tenant, TenantState};
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{backoff, completion, pausable_failpoint};
|
||||
use utils::{backoff, completion};
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
|
||||
@@ -41,7 +41,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
fs_ext, pausable_failpoint,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
vec_map::VecMap,
|
||||
};
|
||||
@@ -61,7 +60,6 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
@@ -90,6 +88,9 @@ use crate::{
|
||||
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
|
||||
};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
@@ -1423,7 +1424,7 @@ impl Timeline {
|
||||
let layer_map = guard.layer_map();
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size;
|
||||
size += l.file_size();
|
||||
}
|
||||
size
|
||||
}
|
||||
@@ -2453,6 +2454,8 @@ impl Timeline {
|
||||
let span = tracing::Span::current();
|
||||
|
||||
// Copy to move into the task we're about to spawn
|
||||
let generation = self.generation;
|
||||
let shard = self.get_shard_index();
|
||||
let this = self.myself.upgrade().expect("&self method holds the arc");
|
||||
|
||||
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
|
||||
@@ -2466,14 +2469,11 @@ impl Timeline {
|
||||
|
||||
for discovered in discovered {
|
||||
let (name, kind) = match discovered {
|
||||
Discovered::Layer(layer_file_name, local_metadata) => {
|
||||
discovered_layers.push((layer_file_name, local_metadata));
|
||||
Discovered::Layer(layer_file_name, local_path, file_size) => {
|
||||
discovered_layers.push((layer_file_name, local_path, file_size));
|
||||
continue;
|
||||
}
|
||||
Discovered::IgnoredBackup(path) => {
|
||||
std::fs::remove_file(path)
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Removing .old file");
|
||||
Discovered::IgnoredBackup => {
|
||||
continue;
|
||||
}
|
||||
Discovered::Unknown(file_name) => {
|
||||
@@ -2499,8 +2499,13 @@ impl Timeline {
|
||||
);
|
||||
}
|
||||
|
||||
let decided =
|
||||
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
|
||||
let decided = init::reconcile(
|
||||
discovered_layers,
|
||||
index_part.as_ref(),
|
||||
disk_consistent_lsn,
|
||||
generation,
|
||||
shard,
|
||||
);
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
let mut needs_cleanup = Vec::new();
|
||||
@@ -2508,6 +2513,21 @@ impl Timeline {
|
||||
|
||||
for (name, decision) in decided {
|
||||
let decision = match decision {
|
||||
Ok(UseRemote { local, remote }) => {
|
||||
// Remote is authoritative, but we may still choose to retain
|
||||
// the local file if the contents appear to match
|
||||
if local.metadata.file_size() == remote.file_size() {
|
||||
// Use the local file, but take the remote metadata so that we pick up
|
||||
// the correct generation.
|
||||
UseLocal(LocalLayerFileMetadata {
|
||||
metadata: remote,
|
||||
local_path: local.local_path,
|
||||
})
|
||||
} else {
|
||||
init::cleanup_local_file_for_remote(&local, &remote)?;
|
||||
UseRemote { local, remote }
|
||||
}
|
||||
}
|
||||
Ok(decision) => decision,
|
||||
Err(DismissedLayer::Future { local }) => {
|
||||
if let Some(local) = local {
|
||||
@@ -2525,11 +2545,6 @@ impl Timeline {
|
||||
// this file never existed remotely, we will have to do rework
|
||||
continue;
|
||||
}
|
||||
Err(DismissedLayer::BadMetadata(local)) => {
|
||||
init::cleanup_local_file_for_remote(&local)?;
|
||||
// this file never existed remotely, we will have to do rework
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match &name {
|
||||
@@ -2540,12 +2555,14 @@ impl Timeline {
|
||||
tracing::debug!(layer=%name, ?decision, "applied");
|
||||
|
||||
let layer = match decision {
|
||||
Resident { local, remote } => {
|
||||
total_physical_size += local.file_size;
|
||||
Layer::for_resident(conf, &this, local.local_path, name, remote)
|
||||
UseLocal(local) => {
|
||||
total_physical_size += local.metadata.file_size();
|
||||
Layer::for_resident(conf, &this, local.local_path, name, local.metadata)
|
||||
.drop_eviction_guard()
|
||||
}
|
||||
Evicted(remote) => Layer::for_evicted(conf, &this, name, remote),
|
||||
Evicted(remote) | UseRemote { remote, .. } => {
|
||||
Layer::for_evicted(conf, &this, name, remote)
|
||||
}
|
||||
};
|
||||
|
||||
loaded_layers.push(layer);
|
||||
@@ -3054,7 +3071,7 @@ impl Timeline {
|
||||
|
||||
HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
(&layer.metadata()).into(),
|
||||
last_activity_ts,
|
||||
)
|
||||
});
|
||||
@@ -3865,7 +3882,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let (layers_to_upload, delta_layer_to_add) = if create_image_layer {
|
||||
let (layers_to_upload, delta_layers_to_add) = if create_image_layer {
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let ((rel_partition, metadata_partition), _lsn) = self
|
||||
@@ -3882,26 +3899,23 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// For metadata, always create delta layers.
|
||||
let delta_layer = if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single aux file keyspace"
|
||||
);
|
||||
let metadata_keyspace = &metadata_partition.parts[0];
|
||||
assert_eq!(
|
||||
metadata_keyspace.0.ranges.len(),
|
||||
1,
|
||||
"aux file keyspace should be a single range"
|
||||
);
|
||||
self.create_delta_layer(
|
||||
&frozen_layer,
|
||||
Some(metadata_keyspace.0.ranges[0].clone()),
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
let delta_layers = if !metadata_partition.parts.is_empty() {
|
||||
// In the current implementation, the metadata partition will only have one part, and the part will only have
|
||||
// one single key range. This might change in the future.
|
||||
let mut delta_layers_created = Vec::new();
|
||||
for ks in &metadata_partition.parts {
|
||||
for range in &ks.0.ranges {
|
||||
let layer = self
|
||||
.create_delta_layer(&frozen_layer, Some(range.clone()), ctx)
|
||||
.await?;
|
||||
if let Some(layer) = layer {
|
||||
delta_layers_created.push(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
delta_layers_created
|
||||
} else {
|
||||
None
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
@@ -3916,12 +3930,8 @@ impl Timeline {
|
||||
.await?,
|
||||
);
|
||||
|
||||
if let Some(delta_layer) = delta_layer {
|
||||
layers_to_upload.push(delta_layer.clone());
|
||||
(layers_to_upload, Some(delta_layer))
|
||||
} else {
|
||||
(layers_to_upload, None)
|
||||
}
|
||||
layers_to_upload.extend(delta_layers.iter().cloned());
|
||||
(layers_to_upload, delta_layers)
|
||||
} else {
|
||||
// Normal case, write out a L0 delta layer file.
|
||||
// `create_delta_layer` will not modify the layer map.
|
||||
@@ -3929,12 +3939,7 @@ impl Timeline {
|
||||
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
|
||||
panic!("delta layer cannot be empty if no filter is applied");
|
||||
};
|
||||
(
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
vec![layer.clone()],
|
||||
Some(layer),
|
||||
)
|
||||
(vec![layer.clone()], vec![layer])
|
||||
};
|
||||
|
||||
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
|
||||
@@ -3955,7 +3960,7 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics);
|
||||
|
||||
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
@@ -4708,16 +4713,11 @@ impl Timeline {
|
||||
|
||||
async fn rewrite_layers(
|
||||
self: &Arc<Self>,
|
||||
mut replace_layers: Vec<(Layer, ResidentLayer)>,
|
||||
mut drop_layers: Vec<Layer>,
|
||||
replace_layers: Vec<(Layer, ResidentLayer)>,
|
||||
drop_layers: Vec<Layer>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
|
||||
// to avoid double-removing, and avoid rewriting something that was removed.
|
||||
replace_layers.retain(|(l, _)| guard.contains(l));
|
||||
drop_layers.retain(|l| guard.contains(l));
|
||||
|
||||
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
|
||||
|
||||
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
|
||||
@@ -5592,6 +5592,26 @@ fn is_send() {
|
||||
_assert_send::<TimelineWriter<'_>>();
|
||||
}
|
||||
|
||||
/// Add a suffix to a layer file's name: .{num}.old
|
||||
/// Uses the first available num (starts at 0)
|
||||
fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
let filename = path
|
||||
.file_name()
|
||||
.ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
|
||||
let mut new_path = path.to_owned();
|
||||
|
||||
for i in 0u32.. {
|
||||
new_path.set_file_name(format!("{filename}.{i}.old"));
|
||||
if !new_path.exists() {
|
||||
std::fs::rename(path, &new_path)
|
||||
.with_context(|| format!("rename {path:?} to {new_path:?}"))?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
bail!("couldn't find an unused backup number for {:?}", path)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
@@ -176,24 +176,13 @@ impl Timeline {
|
||||
async fn compact_shard_ancestors(
|
||||
self: &Arc<Self>,
|
||||
rewrite_max: usize,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut drop_layers = Vec::new();
|
||||
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
let layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
|
||||
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
|
||||
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
|
||||
// pitr_interval, for example because a branchpoint references it.
|
||||
//
|
||||
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
|
||||
// are rewriting layers.
|
||||
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
|
||||
|
||||
tracing::info!(
|
||||
"latest_gc_cutoff: {}, pitr cutoff {}",
|
||||
*latest_gc_cutoff,
|
||||
self.gc_info.read().unwrap().cutoffs.pitr
|
||||
);
|
||||
// We will use the PITR cutoff as a condition for rewriting layers.
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map().iter_historic_layers() {
|
||||
@@ -252,9 +241,9 @@ impl Timeline {
|
||||
|
||||
// Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
|
||||
// without incurring the I/O cost of a rewrite.
|
||||
if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
|
||||
debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
|
||||
layer_desc.get_lsn_range().end, *latest_gc_cutoff);
|
||||
if layer_desc.get_lsn_range().end >= pitr_cutoff {
|
||||
debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
|
||||
layer_desc.get_lsn_range().end, pitr_cutoff);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -264,10 +253,13 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only rewrite layers if their generations differ. This guarantees:
|
||||
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
|
||||
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
|
||||
if layer.metadata().generation == self.generation {
|
||||
// Only rewrite layers if they would have different remote paths: either they belong to this
|
||||
// shard but an old generation, or they belonged to another shard. This also implicitly
|
||||
// guarantees that the layer is persistent in remote storage (as only remote persistent
|
||||
// layers are carried across shard splits, any local-only layer would be in the current generation)
|
||||
if layer.metadata().generation == self.generation
|
||||
&& layer.metadata().shard.shard_count == self.shard_identity.count
|
||||
{
|
||||
debug!(%layer, "Skipping rewrite, is not from old generation");
|
||||
continue;
|
||||
}
|
||||
@@ -280,69 +272,18 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Fall through: all our conditions for doing a rewrite passed.
|
||||
layers_to_rewrite.push(layer);
|
||||
// TODO: implement rewriting
|
||||
tracing::debug!(%layer, "Would rewrite layer");
|
||||
}
|
||||
|
||||
// Drop read lock on layer map before we start doing time-consuming I/O
|
||||
// Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
|
||||
drop(layers);
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Safety of layer rewrites:
|
||||
// - We are writing to a different local file path than we are reading from, so the old Layer
|
||||
// cannot interfere with the new one.
|
||||
// - In the page cache, contents for a particular VirtualFile are stored with a file_id that
|
||||
// is different for two layers with the same name (in `ImageLayerInner::new` we always
|
||||
// acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
|
||||
// reading the index from one layer file, and then data blocks from the rewritten layer file.
|
||||
// - Any readers that have a reference to the old layer will keep it alive until they are done
|
||||
// with it. If they are trying to promote from remote storage, that will fail, but this is the same
|
||||
// as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
|
||||
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
|
||||
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
|
||||
// - ingestion, which only inserts layers, therefore cannot collide with us.
|
||||
let resident = layer.download_and_keep_resident().await?;
|
||||
|
||||
let keys_written = resident
|
||||
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
|
||||
.await?;
|
||||
|
||||
if keys_written > 0 {
|
||||
let new_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
|
||||
layer.metadata().file_size,
|
||||
new_layer.metadata().file_size);
|
||||
|
||||
replace_image_layers.push((layer, new_layer));
|
||||
} else {
|
||||
// Drop the old layer. Usually for this case we would already have noticed that
|
||||
// the layer has no data for us with the ShardedRange check above, but
|
||||
drop_layers.push(layer);
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
|
||||
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
|
||||
// to remote index) and be removed. This is inefficient but safe.
|
||||
fail::fail_point!("compact-shard-ancestors-localonly");
|
||||
// TODO: collect layers to rewrite
|
||||
let replace_layers = Vec::new();
|
||||
|
||||
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
|
||||
self.rewrite_layers(replace_image_layers, drop_layers)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-enqueued");
|
||||
self.rewrite_layers(replace_layers, drop_layers).await?;
|
||||
|
||||
// We wait for all uploads to complete before finishing this compaction stage. This is not
|
||||
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
|
||||
@@ -350,8 +291,6 @@ impl Timeline {
|
||||
// load.
|
||||
self.remote_client.wait_completion().await?;
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-persistent");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{error, info, instrument, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
|
||||
@@ -7,20 +7,19 @@ use crate::{
|
||||
index::{IndexPart, LayerFileMetadata},
|
||||
},
|
||||
storage_layer::LayerName,
|
||||
Generation,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use std::{
|
||||
collections::{hash_map, HashMap},
|
||||
str::FromStr,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Identified files in the timeline directory.
|
||||
pub(super) enum Discovered {
|
||||
/// The only one we care about
|
||||
Layer(LayerName, LocalLayerFileMetadata),
|
||||
Layer(LayerName, Utf8PathBuf, u64),
|
||||
/// Old ephmeral files from previous launches, should be removed
|
||||
Ephemeral(String),
|
||||
/// Old temporary timeline files, unsure what these really are, should be removed
|
||||
@@ -28,7 +27,7 @@ pub(super) enum Discovered {
|
||||
/// Temporary on-demand download files, should be removed
|
||||
TemporaryDownload(String),
|
||||
/// Backup file from previously future layers
|
||||
IgnoredBackup(Utf8PathBuf),
|
||||
IgnoredBackup,
|
||||
/// Unrecognized, warn about these
|
||||
Unknown(String),
|
||||
}
|
||||
@@ -44,15 +43,12 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
|
||||
let discovered = match LayerName::from_str(&file_name) {
|
||||
Ok(file_name) => {
|
||||
let file_size = direntry.metadata()?.len();
|
||||
Discovered::Layer(
|
||||
file_name,
|
||||
LocalLayerFileMetadata::new(direntry.path().to_owned(), file_size),
|
||||
)
|
||||
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
|
||||
}
|
||||
Err(_) => {
|
||||
if file_name.ends_with(".old") {
|
||||
// ignore these
|
||||
Discovered::IgnoredBackup(direntry.path().to_owned())
|
||||
Discovered::IgnoredBackup
|
||||
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {
|
||||
Discovered::TemporaryDownload(file_name)
|
||||
} else if is_ephemeral_file(&file_name) {
|
||||
@@ -75,32 +71,37 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
|
||||
/// this structure extends it with metadata describing the layer's presence in local storage.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) struct LocalLayerFileMetadata {
|
||||
pub(super) file_size: u64,
|
||||
pub(super) metadata: LayerFileMetadata,
|
||||
pub(super) local_path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl LocalLayerFileMetadata {
|
||||
pub fn new(local_path: Utf8PathBuf, file_size: u64) -> Self {
|
||||
pub fn new(
|
||||
local_path: Utf8PathBuf,
|
||||
file_size: u64,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_path,
|
||||
file_size,
|
||||
metadata: LayerFileMetadata::new(file_size, generation, shard),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// For a layer that is present in remote metadata, this type describes how to handle
|
||||
/// it during startup: it is either Resident (and we have some metadata about a local file),
|
||||
/// or it is Evicted (and we only have remote metadata).
|
||||
/// Decision on what to do with a layer file after considering its local and remote metadata.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) enum Decision {
|
||||
/// The layer is not present locally.
|
||||
Evicted(LayerFileMetadata),
|
||||
/// The layer is present locally, and metadata matches: we may hook up this layer to the
|
||||
/// existing file in local storage.
|
||||
Resident {
|
||||
/// The layer is present locally, but local metadata does not match remote; we must
|
||||
/// delete it and treat it as evicted.
|
||||
UseRemote {
|
||||
local: LocalLayerFileMetadata,
|
||||
remote: LayerFileMetadata,
|
||||
},
|
||||
/// The layer is present locally, and metadata matches.
|
||||
UseLocal(LocalLayerFileMetadata),
|
||||
}
|
||||
|
||||
/// A layer needs to be left out of the layer map.
|
||||
@@ -116,81 +117,77 @@ pub(super) enum DismissedLayer {
|
||||
/// In order to make crash safe updates to layer map, we must dismiss layers which are only
|
||||
/// found locally or not yet included in the remote `index_part.json`.
|
||||
LocalOnly(LocalLayerFileMetadata),
|
||||
|
||||
/// The layer exists in remote storage but the local layer's metadata (e.g. file size)
|
||||
/// does not match it
|
||||
BadMetadata(LocalLayerFileMetadata),
|
||||
}
|
||||
|
||||
/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
|
||||
pub(super) fn reconcile(
|
||||
local_layers: Vec<(LayerName, LocalLayerFileMetadata)>,
|
||||
discovered: Vec<(LayerName, Utf8PathBuf, u64)>,
|
||||
index_part: Option<&IndexPart>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Vec<(LayerName, Result<Decision, DismissedLayer>)> {
|
||||
let Some(index_part) = index_part else {
|
||||
// If we have no remote metadata, no local layer files are considered valid to load
|
||||
return local_layers
|
||||
.into_iter()
|
||||
.map(|(layer_name, local_metadata)| {
|
||||
(layer_name, Err(DismissedLayer::LocalOnly(local_metadata)))
|
||||
})
|
||||
.collect();
|
||||
};
|
||||
use Decision::*;
|
||||
|
||||
let mut result = Vec::new();
|
||||
// name => (local_metadata, remote_metadata)
|
||||
type Collected =
|
||||
HashMap<LayerName, (Option<LocalLayerFileMetadata>, Option<LayerFileMetadata>)>;
|
||||
|
||||
let mut remote_layers = HashMap::new();
|
||||
let mut discovered = discovered
|
||||
.into_iter()
|
||||
.map(|(layer_name, local_path, file_size)| {
|
||||
(
|
||||
layer_name,
|
||||
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
|
||||
// it is not in IndexPart, in which case using our current generation makes sense
|
||||
// because it will be uploaded in this generation.
|
||||
(
|
||||
Some(LocalLayerFileMetadata::new(
|
||||
local_path, file_size, generation, shard,
|
||||
)),
|
||||
None,
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Collected>();
|
||||
|
||||
// Construct Decisions for layers that are found locally, if they're in remote metadata. Otherwise
|
||||
// construct DismissedLayers to get rid of them.
|
||||
for (layer_name, local_metadata) in local_layers {
|
||||
let Some(remote_metadata) = index_part.layer_metadata.get(&layer_name) else {
|
||||
result.push((layer_name, Err(DismissedLayer::LocalOnly(local_metadata))));
|
||||
continue;
|
||||
};
|
||||
|
||||
if remote_metadata.file_size != local_metadata.file_size {
|
||||
result.push((layer_name, Err(DismissedLayer::BadMetadata(local_metadata))));
|
||||
continue;
|
||||
}
|
||||
|
||||
remote_layers.insert(
|
||||
layer_name,
|
||||
Decision::Resident {
|
||||
local: local_metadata,
|
||||
remote: remote_metadata.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Construct Decision for layers that were not found locally
|
||||
// merge any index_part information, when available
|
||||
index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.as_ref()
|
||||
.map(|ip| ip.layer_metadata.iter())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
|
||||
.for_each(|(name, metadata)| {
|
||||
if let hash_map::Entry::Vacant(entry) = remote_layers.entry(name.clone()) {
|
||||
entry.insert(Decision::Evicted(metadata.clone()));
|
||||
if let Some(existing) = discovered.get_mut(name) {
|
||||
existing.1 = Some(metadata);
|
||||
} else {
|
||||
discovered.insert(name.to_owned(), (None, Some(metadata)));
|
||||
}
|
||||
});
|
||||
|
||||
// For layers that were found in authoritative remote metadata, apply a final check that they are within
|
||||
// the disk_consistent_lsn.
|
||||
result.extend(remote_layers.into_iter().map(|(name, decision)| {
|
||||
if name.is_in_future(disk_consistent_lsn) {
|
||||
match decision {
|
||||
Decision::Evicted(_remote) => (name, Err(DismissedLayer::Future { local: None })),
|
||||
Decision::Resident {
|
||||
local,
|
||||
remote: _remote,
|
||||
} => (name, Err(DismissedLayer::Future { local: Some(local) })),
|
||||
}
|
||||
} else {
|
||||
(name, Ok(decision))
|
||||
}
|
||||
}));
|
||||
discovered
|
||||
.into_iter()
|
||||
.map(|(name, (local, remote))| {
|
||||
let decision = if name.is_in_future(disk_consistent_lsn) {
|
||||
Err(DismissedLayer::Future { local })
|
||||
} else {
|
||||
match (local, remote) {
|
||||
(Some(local), Some(remote)) if local.metadata != remote => {
|
||||
Ok(UseRemote { local, remote })
|
||||
}
|
||||
(Some(x), Some(_)) => Ok(UseLocal(x)),
|
||||
(None, Some(x)) => Ok(Evicted(x)),
|
||||
(Some(x), None) => Err(DismissedLayer::LocalOnly(x)),
|
||||
(None, None) => {
|
||||
unreachable!("there must not be any non-local non-remote files")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
result
|
||||
(name, decision)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
|
||||
@@ -199,15 +196,25 @@ pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
|
||||
std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
|
||||
}
|
||||
|
||||
pub(super) fn cleanup_local_file_for_remote(local: &LocalLayerFileMetadata) -> anyhow::Result<()> {
|
||||
let local_size = local.file_size;
|
||||
pub(super) fn cleanup_local_file_for_remote(
|
||||
local: &LocalLayerFileMetadata,
|
||||
remote: &LayerFileMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
let local_size = local.metadata.file_size();
|
||||
let remote_size = remote.file_size();
|
||||
let path = &local.local_path;
|
||||
let file_name = path.file_name().expect("must be file path");
|
||||
tracing::warn!(
|
||||
"removing local file {file_name:?} because it has unexpected length {local_size};"
|
||||
);
|
||||
|
||||
std::fs::remove_file(path).with_context(|| format!("failed to remove layer at {path}"))
|
||||
let file_name = path.file_name().expect("must be file path");
|
||||
tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
|
||||
if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
|
||||
assert!(
|
||||
path.exists(),
|
||||
"we would leave the local_layer without a file if this does not hold: {path}",
|
||||
);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn cleanup_future_layer(
|
||||
@@ -229,8 +236,8 @@ pub(super) fn cleanup_local_only_file(
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = name.kind();
|
||||
tracing::info!(
|
||||
"found local-only {kind} layer {name} size {}",
|
||||
local.file_size
|
||||
"found local-only {kind} layer {name}, metadata {:?}",
|
||||
local.metadata
|
||||
);
|
||||
std::fs::remove_file(&local.local_path)?;
|
||||
Ok(())
|
||||
|
||||
@@ -166,7 +166,7 @@ impl LayerManager {
|
||||
/// Flush a frozen layer and add the written delta layer to the layer map.
|
||||
pub(crate) fn finish_flush_l0_layer(
|
||||
&mut self,
|
||||
delta_layer: Option<&ResidentLayer>,
|
||||
delta_layers: &[ResidentLayer],
|
||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||
metrics: &TimelineMetrics,
|
||||
) {
|
||||
@@ -181,10 +181,12 @@ impl LayerManager {
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
||||
|
||||
if let Some(l) = delta_layer {
|
||||
if !delta_layers.is_empty() {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
for l in delta_layers {
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
}
|
||||
@@ -212,34 +214,13 @@ impl LayerManager {
|
||||
&mut self,
|
||||
rewrite_layers: &[(Layer, ResidentLayer)],
|
||||
drop_layers: &[Layer],
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for (old_layer, new_layer) in rewrite_layers {
|
||||
debug_assert_eq!(
|
||||
old_layer.layer_desc().key_range,
|
||||
new_layer.layer_desc().key_range
|
||||
);
|
||||
debug_assert_eq!(
|
||||
old_layer.layer_desc().lsn_range,
|
||||
new_layer.layer_desc().lsn_range
|
||||
);
|
||||
|
||||
// Safety: we may never rewrite the same file in-place. Callers are responsible
|
||||
// for ensuring that they only rewrite layers after something changes the path,
|
||||
// such as an increment in the generation number.
|
||||
assert_ne!(old_layer.local_path(), new_layer.local_path());
|
||||
// TODO: implement rewrites (currently this code path only used for drops)
|
||||
assert!(rewrite_layers.is_empty());
|
||||
|
||||
Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
|
||||
|
||||
Self::insert_historic_layer(
|
||||
new_layer.as_ref().clone(),
|
||||
&mut updates,
|
||||
&mut self.layer_fmgr,
|
||||
);
|
||||
|
||||
metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
|
||||
}
|
||||
for l in drop_layers {
|
||||
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
|
||||
@@ -213,7 +213,10 @@ impl UploadQueue {
|
||||
|
||||
let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
|
||||
for (layer_name, layer_metadata) in &index_part.layer_metadata {
|
||||
files.insert(layer_name.to_owned(), layer_metadata.clone());
|
||||
files.insert(
|
||||
layer_name.to_owned(),
|
||||
LayerFileMetadata::from(layer_metadata),
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -319,7 +322,9 @@ impl std::fmt::Display for UploadOp {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?}, gen={:?})",
|
||||
layer, metadata.file_size, metadata.generation
|
||||
layer,
|
||||
metadata.file_size(),
|
||||
metadata.generation
|
||||
)
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => {
|
||||
|
||||
@@ -344,21 +344,21 @@ macro_rules! with_file {
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
pub async fn open(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
Self::open_with_options(path, OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// Create a new file for writing. If the file exists, it will be truncated.
|
||||
/// Like File::create.
|
||||
pub async fn create<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
pub async fn create(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
Self::open_with_options(
|
||||
path.as_ref(),
|
||||
path,
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
ctx,
|
||||
)
|
||||
@@ -370,13 +370,12 @@ impl VirtualFile {
|
||||
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
|
||||
/// they will be applied also when the file is subsequently re-opened, not only
|
||||
/// on the first time. Make sure that's sane!
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
pub async fn open_with_options(
|
||||
path: &Utf8Path,
|
||||
open_options: &OpenOptions,
|
||||
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
let path_ref = path.as_ref();
|
||||
let path_str = path_ref.to_string();
|
||||
let path_str = path.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let (tenant_id, shard_id, timeline_id) =
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
|
||||
@@ -402,7 +401,7 @@ impl VirtualFile {
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path_ref.as_std_path()).await?
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
@@ -418,7 +417,7 @@ impl VirtualFile {
|
||||
let vfile = VirtualFile {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path_ref.to_path_buf(),
|
||||
path: path.to_path_buf(),
|
||||
open_options: reopen_options,
|
||||
tenant_id,
|
||||
shard_id,
|
||||
|
||||
@@ -51,6 +51,7 @@ int flush_every_n_requests = 8;
|
||||
|
||||
int neon_protocol_version = 2;
|
||||
|
||||
static int n_reconnect_attempts = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
|
||||
@@ -94,37 +95,18 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
|
||||
typedef enum PSConnectionState {
|
||||
PS_Disconnected, /* no connection yet */
|
||||
PS_Connecting_Startup, /* connection starting up */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connected, /* connected, pagestream established */
|
||||
} PSConnectionState;
|
||||
|
||||
/* This backend's per-shard connections */
|
||||
typedef struct
|
||||
{
|
||||
TimestampTz last_connect_time; /* read-only debug value */
|
||||
TimestampTz last_reconnect_time;
|
||||
uint32 delay_us;
|
||||
int n_reconnect_attempts;
|
||||
PGconn *conn;
|
||||
|
||||
/*---
|
||||
* Pageserver connection state, i.e.
|
||||
* disconnected: conn == NULL, wes == NULL;
|
||||
* conn_startup: connection initiated, waiting for connection establishing
|
||||
* conn_ps: PageStream query sent, waiting for confirmation
|
||||
* connected: PageStream established
|
||||
*/
|
||||
PSConnectionState state;
|
||||
PGconn *conn;
|
||||
/*---
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes_read;
|
||||
WaitEventSet *wes;
|
||||
} PageServer;
|
||||
|
||||
static PageServer page_servers[MAX_SHARDS];
|
||||
@@ -321,277 +303,119 @@ get_shard_number(BufferTag *tag)
|
||||
return hash % n_shards;
|
||||
}
|
||||
|
||||
static inline void
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
{
|
||||
if (shard->wes_read)
|
||||
{
|
||||
FreeWaitEventSet(shard->wes_read);
|
||||
shard->wes_read = NULL;
|
||||
}
|
||||
if (shard->conn)
|
||||
{
|
||||
PQfinish(shard->conn);
|
||||
shard->conn = NULL;
|
||||
}
|
||||
|
||||
shard->state = PS_Disconnected;
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to a pageserver, or continue to try to connect if we're yet to
|
||||
* complete the connection (e.g. due to receiving an earlier cancellation
|
||||
* during connection start).
|
||||
* Returns true if successfully connected; false if the connection failed.
|
||||
*
|
||||
* Throws errors in unrecoverable situations, or when this backend's query
|
||||
* is canceled.
|
||||
*/
|
||||
static bool
|
||||
pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
char *query;
|
||||
int ret;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
PGconn *conn;
|
||||
WaitEventSet *wes;
|
||||
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
|
||||
static TimestampTz last_connect_time = 0;
|
||||
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
TimestampTz now;
|
||||
uint64_t us_since_last_connect;
|
||||
bool broke_from_loop = false;
|
||||
|
||||
Assert(page_servers[shard_no].conn == NULL);
|
||||
|
||||
/*
|
||||
* Get the connection string for this shard. If the shard map has been
|
||||
* updated since we last looked, this will also disconnect any existing
|
||||
* pageserver connections as a side effect.
|
||||
* Note that connstr is used both during connection start, and when we
|
||||
* log the successful connection.
|
||||
*/
|
||||
load_shard_map(shard_no, connstr, NULL);
|
||||
|
||||
switch (shard->state)
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_connect = now - last_connect_time;
|
||||
if (us_since_last_connect < MAX_RECONNECT_INTERVAL_USEC)
|
||||
{
|
||||
case PS_Disconnected:
|
||||
{
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n_pgsql_params;
|
||||
TimestampTz now;
|
||||
int64 us_since_last_attempt;
|
||||
|
||||
/* Make sure we start with a clean slate */
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Disconnected");
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* If we did other tasks between reconnect attempts, then we won't
|
||||
* need to wait as long as a full delay.
|
||||
*/
|
||||
if (us_since_last_attempt < shard->delay_us)
|
||||
{
|
||||
pg_usleep(shard->delay_us - us_since_last_attempt);
|
||||
}
|
||||
|
||||
/* update the delay metric */
|
||||
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
|
||||
* variable was set, use that as the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so when
|
||||
* we set the password before the connection string, the connection string
|
||||
* can override the password from the env variable. Seems useful, although
|
||||
* we don't currently use that capability anywhere.
|
||||
*/
|
||||
keywords[0] = "dbname";
|
||||
values[0] = connstr;
|
||||
n_pgsql_params = 1;
|
||||
|
||||
if (neon_auth_token)
|
||||
{
|
||||
keywords[1] = "password";
|
||||
values[1] = neon_auth_token;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
keywords[n_pgsql_params] = NULL;
|
||||
values[n_pgsql_params] = NULL;
|
||||
|
||||
shard->conn = PQconnectStartParams(keywords, values, 1);
|
||||
if (!shard->conn)
|
||||
{
|
||||
neon_shard_log(shard_no, elevel, "Failed to connect to pageserver: out of memory");
|
||||
return false;
|
||||
}
|
||||
|
||||
shard->state = PS_Connecting_Startup;
|
||||
/* fallthrough */
|
||||
pg_usleep(delay_us);
|
||||
delay_us *= 2;
|
||||
}
|
||||
case PS_Connecting_Startup:
|
||||
else
|
||||
{
|
||||
char *pagestream_query;
|
||||
int ps_send_query_ret;
|
||||
bool connected = false;
|
||||
int poll_result = PGRES_POLLING_WRITING;
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
|
||||
delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
WaitEvent event;
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
|
||||
* variable was set, use that as the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so when
|
||||
* we set the password before the connection string, the connection string
|
||||
* can override the password from the env variable. Seems useful, although
|
||||
* we don't currently use that capability anywhere.
|
||||
*/
|
||||
n = 0;
|
||||
if (neon_auth_token)
|
||||
{
|
||||
keywords[n] = "password";
|
||||
values[n] = neon_auth_token;
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = connstr;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
conn = PQconnectdbParams(keywords, values, 1);
|
||||
last_connect_time = GetCurrentTimestamp();
|
||||
|
||||
switch (poll_result)
|
||||
{
|
||||
default: /* unknown/unused states are handled as a failed connection */
|
||||
case PGRES_POLLING_FAILED:
|
||||
{
|
||||
char *pqerr = PQerrorMessage(shard->conn);
|
||||
char *msg = NULL;
|
||||
neon_shard_log(shard_no, DEBUG5, "POLLING_FAILED");
|
||||
if (PQstatus(conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
if (pqerr)
|
||||
msg = pchomp(pqerr);
|
||||
PQfinish(conn);
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
if (msg)
|
||||
{
|
||||
neon_shard_log(shard_no, elevel,
|
||||
"could not connect to pageserver: %s",
|
||||
msg);
|
||||
pfree(msg);
|
||||
}
|
||||
else
|
||||
neon_shard_log(shard_no, elevel,
|
||||
"could not connect to pageserver");
|
||||
|
||||
return false;
|
||||
}
|
||||
case PGRES_POLLING_READING:
|
||||
/* Sleep until there's something to do */
|
||||
while (true)
|
||||
{
|
||||
int rc = WaitLatchOrSocket(MyLatch,
|
||||
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
|
||||
PQsocket(shard->conn),
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc);
|
||||
if (rc & WL_LATCH_SET)
|
||||
{
|
||||
ResetLatch(MyLatch);
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
if (rc & WL_SOCKET_READABLE)
|
||||
break;
|
||||
}
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
/* Sleep until there's something to do */
|
||||
while (true)
|
||||
{
|
||||
int rc = WaitLatchOrSocket(MyLatch,
|
||||
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
|
||||
PQsocket(shard->conn),
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc);
|
||||
if (rc & WL_LATCH_SET)
|
||||
{
|
||||
ResetLatch(MyLatch);
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
if (rc & WL_SOCKET_WRITEABLE)
|
||||
break;
|
||||
}
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
case PGRES_POLLING_OK:
|
||||
neon_shard_log(shard_no, DEBUG5, "POLLING_OK");
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
poll_result = PQconnectPoll(shard->conn);
|
||||
elog(DEBUG5, "PQconnectPoll=>%d", poll_result);
|
||||
}
|
||||
while (!connected);
|
||||
|
||||
/* No more polling needed; connection succeeded */
|
||||
shard->last_connect_time = GetCurrentTimestamp();
|
||||
|
||||
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
|
||||
|
||||
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
case 2:
|
||||
pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
|
||||
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
case 1:
|
||||
pagestream_query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
|
||||
}
|
||||
|
||||
if (PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
|
||||
ps_send_query_ret = PQsendQuery(shard->conn, pagestream_query);
|
||||
pfree(pagestream_query);
|
||||
if (ps_send_query_ret != 1)
|
||||
{
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
shard->state = PS_Connecting_PageStream;
|
||||
/* fallthrough */
|
||||
}
|
||||
case PS_Connecting_PageStream:
|
||||
ret = PQsendQuery(conn, query);
|
||||
pfree(query);
|
||||
if (ret != 1)
|
||||
{
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_PageStream");
|
||||
PQfinish(conn);
|
||||
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(shard->conn))
|
||||
PG_TRY();
|
||||
{
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -599,37 +423,40 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
pfree(msg);
|
||||
return false;
|
||||
/* Returning from inside PG_TRY is bad, so we break/return later */
|
||||
broke_from_loop = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shard->state = PS_Connected;
|
||||
/* fallthrough */
|
||||
}
|
||||
case PS_Connected:
|
||||
/*
|
||||
* We successfully connected. Future connections to this PageServer
|
||||
* will do fast retries again, with exponential backoff.
|
||||
*/
|
||||
shard->delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connected");
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
|
||||
return true;
|
||||
default:
|
||||
neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state);
|
||||
PG_CATCH();
|
||||
{
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
/* This shouldn't be hit */
|
||||
Assert(false);
|
||||
PG_END_TRY();
|
||||
|
||||
if (broke_from_loop)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -649,7 +476,7 @@ retry:
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -675,8 +502,7 @@ retry:
|
||||
|
||||
/*
|
||||
* Reset prefetch and drop connection to the shard.
|
||||
* It also drops connection to all other shards involved in prefetch, through
|
||||
* prefetch_on_ps_disconnect().
|
||||
* It also drops connection to all other shards involved in prefetch.
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
@@ -686,6 +512,9 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*
|
||||
* Prefetch state should be reset even if page_servers[shard_no].conn == NULL,
|
||||
* because prefetch request may be registered before connection is established.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
|
||||
@@ -698,36 +527,37 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
static void
|
||||
pageserver_disconnect_shard(shardno_t shard_no)
|
||||
{
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
* what state the connection is in. For example, if we sent the request
|
||||
* but didn't receive a response yet, we might receive the response some
|
||||
* time later after we have already sent a new unrelated request. Close
|
||||
* the connection to avoid getting confused.
|
||||
* Similarly, even when we're in PS_DISCONNECTED, we may have junk to
|
||||
* clean up: It is possible that we encountered an error allocating any
|
||||
* of the wait event sets or the psql connection, or failed when we tried
|
||||
* to attach wait events to the WaitEventSets.
|
||||
*/
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
shard->state = PS_Disconnected;
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
}
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
FreeWaitEventSet(page_servers[shard_no].wes);
|
||||
page_servers[shard_no].wes = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn;
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (shard->state == PS_Connected && PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
|
||||
pageserver_disconnect(shard_no);
|
||||
pageserver_conn = NULL;
|
||||
}
|
||||
|
||||
req_buff = nm_pack_request(request);
|
||||
@@ -741,19 +571,17 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
|
||||
* connection in case of failure.
|
||||
*/
|
||||
if (shard->state != PS_Connected)
|
||||
if (!page_servers[shard_no].conn)
|
||||
{
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
shard->n_reconnect_attempts += 1;
|
||||
n_reconnect_attempts += 1;
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
} else {
|
||||
Assert(shard->conn != NULL);
|
||||
n_reconnect_attempts = 0;
|
||||
}
|
||||
|
||||
pageserver_conn = shard->conn;
|
||||
pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
@@ -762,17 +590,13 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*
|
||||
* Note that this also will fail when the connection is in the
|
||||
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
|
||||
* point, but on the grand scheme of things it's only a small issue.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
|
||||
pfree(msg);
|
||||
pfree(req_buff.data);
|
||||
return false;
|
||||
@@ -787,7 +611,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -796,68 +619,58 @@ pageserver_receive(shardno_t shard_no)
|
||||
{
|
||||
StringInfoData resp_buff;
|
||||
NeonResponse *resp;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
/* read response */
|
||||
int rc;
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
if (shard->state != PS_Connected)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG,
|
||||
"pageserver_receive: returning NULL for non-connected pageserver connection: 0x%02x",
|
||||
shard->state);
|
||||
if (!pageserver_conn)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Assert(pageserver_conn);
|
||||
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
PG_TRY();
|
||||
{
|
||||
/* call_PQgetCopyData handles rc == 0 */
|
||||
Assert(rc > 0);
|
||||
/* read response */
|
||||
int rc;
|
||||
|
||||
PG_TRY();
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
{
|
||||
resp_buff.len = rc;
|
||||
resp_buff.cursor = 0;
|
||||
resp = nm_unpack_response(&resp_buff);
|
||||
PQfreemem(resp_buff.data);
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) resp);
|
||||
|
||||
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
else if (rc == -1)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due malformatted response");
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
resp = NULL;
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) resp);
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
|
||||
pfree(msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
}
|
||||
else if (rc == -1)
|
||||
PG_CATCH();
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return (NeonResponse *) resp;
|
||||
}
|
||||
@@ -868,7 +681,7 @@ pageserver_flush(shardno_t shard_no)
|
||||
{
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
if (page_servers[shard_no].state != PS_Connected)
|
||||
if (!pageserver_conn)
|
||||
{
|
||||
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
@@ -884,7 +697,6 @@ pageserver_flush(shardno_t shard_no)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1079,7 +891,5 @@ pg_init_libpagestore(void)
|
||||
dbsize_hook = neon_dbsize;
|
||||
}
|
||||
|
||||
memset(page_servers, 0, sizeof(page_servers));
|
||||
|
||||
lfc_init();
|
||||
}
|
||||
|
||||
@@ -94,10 +94,6 @@ static char *hexdump_page(char *page);
|
||||
|
||||
const int SmgrTrace = DEBUG5;
|
||||
|
||||
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
|
||||
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
|
||||
##__VA_ARGS__)
|
||||
|
||||
page_server_api *page_server;
|
||||
|
||||
/* unlogged relation build states */
|
||||
@@ -530,8 +526,6 @@ prefetch_flush_requests(void)
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
* NOTE: callers should make sure they can handle query cancellations in this
|
||||
* function's call path.
|
||||
*/
|
||||
static bool
|
||||
prefetch_wait_for(uint64 ring_index)
|
||||
@@ -567,8 +561,6 @@ prefetch_wait_for(uint64 ring_index)
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*
|
||||
* NOTE: this does IO, and can get canceled out-of-line.
|
||||
*/
|
||||
static bool
|
||||
prefetch_read(PrefetchRequest *slot)
|
||||
@@ -580,14 +572,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(slot->shard_no, ERROR,
|
||||
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
@@ -605,11 +589,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
}
|
||||
else
|
||||
{
|
||||
neon_shard_log(slot->shard_no, WARNING,
|
||||
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||
(long)slot->my_ring_index,
|
||||
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -624,7 +603,6 @@ void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
|
||||
while (MyPState->ring_receive < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
@@ -647,7 +625,6 @@ prefetch_on_ps_disconnect(void)
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
|
||||
prefetch_set_unused(ring_index);
|
||||
}
|
||||
}
|
||||
@@ -714,8 +691,6 @@ static void
|
||||
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
|
||||
{
|
||||
bool found;
|
||||
uint64 mySlotNo = slot->my_ring_index;
|
||||
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
@@ -724,8 +699,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
|
||||
if (force_request_lsns)
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
else
|
||||
@@ -738,11 +711,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
|
||||
{
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
/* loop */
|
||||
}
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
@@ -753,6 +722,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
}
|
||||
@@ -924,10 +894,6 @@ Retry:
|
||||
return ring_index;
|
||||
}
|
||||
|
||||
/*
|
||||
* Note: this function can get canceled and use a long jump to the next catch
|
||||
* context. Take care.
|
||||
*/
|
||||
static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
@@ -959,38 +925,19 @@ page_server_request(void const *req)
|
||||
* Current sharding model assumes that all metadata is present only at shard 0.
|
||||
* We still need to call get_shard_no() to check if shard map is up-to-date.
|
||||
*/
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest ||
|
||||
((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
{
|
||||
shard_no = 0;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
PG_TRY();
|
||||
{
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* Cancellation in this code needs to be handled better at some
|
||||
* point, but this currently seems fine for now.
|
||||
*/
|
||||
page_server->disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
|
||||
return resp;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1958,9 +1905,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
|
||||
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
return exists;
|
||||
@@ -2412,7 +2357,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
Retry:
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -2498,9 +2443,7 @@ Retry:
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
|
||||
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
|
||||
T_NeonGetPageResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
|
||||
}
|
||||
|
||||
/* buffer was used, clean up for later reuse */
|
||||
@@ -2771,9 +2714,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
|
||||
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
|
||||
}
|
||||
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
|
||||
|
||||
@@ -2826,9 +2767,7 @@ neon_dbsize(Oid dbNode)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
|
||||
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
|
||||
}
|
||||
|
||||
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
@@ -3167,9 +3106,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
|
||||
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
|
||||
|
||||
@@ -38,7 +38,6 @@ hmac.workspace = true
|
||||
hostname.workspace = true
|
||||
http.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
@@ -83,7 +82,6 @@ thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
@@ -96,8 +94,10 @@ url.workspace = true
|
||||
urlencoding.workspace = true
|
||||
utils.workspace = true
|
||||
uuid.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
x509-parser.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
redis.workspace = true
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ use futures::future::Either;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use proxy::context::RequestMonitoring;
|
||||
use proxy::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled};
|
||||
use rustls::pki_types::PrivateKeyDer;
|
||||
use tokio::net::TcpListener;
|
||||
@@ -66,8 +65,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)));
|
||||
|
||||
let args = cli().get_matches();
|
||||
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;
|
||||
|
||||
|
||||
@@ -557,14 +557,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
limiter,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(console::locks::ApiLocks::new(
|
||||
"wake_compute_lock",
|
||||
limiter,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
@@ -603,19 +603,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
limiter,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.connect_compute_lock.parse()?;
|
||||
info!(
|
||||
?limiter,
|
||||
shards,
|
||||
?epoch,
|
||||
"Using NodeLocks (connect_compute)"
|
||||
);
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)");
|
||||
let connect_compute_locks = console::locks::ApiLocks::new(
|
||||
"connect_compute_lock",
|
||||
limiter,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
|
||||
@@ -10,14 +10,11 @@ use crate::{
|
||||
};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
|
||||
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use std::{io, net::SocketAddr, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -33,7 +30,7 @@ pub enum ConnectionError {
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] InvalidDnsNameError),
|
||||
TlsError(#[from] native_tls::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
WakeComputeError(#[from] WakeComputeError),
|
||||
@@ -260,7 +257,7 @@ pub struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
|
||||
tokio::net::TcpStream,
|
||||
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
|
||||
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub params: std::collections::HashMap<String, String>,
|
||||
@@ -285,23 +282,12 @@ impl ConnCfg {
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
|
||||
rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
|
||||
rustls::ClientConfig::builder().with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
host,
|
||||
)?;
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(allow_self_signed_compute)
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
|
||||
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
@@ -354,58 +340,6 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
Some(options)
|
||||
}
|
||||
|
||||
fn load_certs() -> Result<Arc<rustls::RootCertStore>, io::Error> {
|
||||
let der_certs = rustls_native_certs::load_native_certs()?;
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme::*;
|
||||
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
|
||||
vec![
|
||||
ECDSA_NISTP521_SHA512,
|
||||
ECDSA_NISTP384_SHA384,
|
||||
ECDSA_NISTP256_SHA256,
|
||||
RSA_PSS_SHA512,
|
||||
RSA_PSS_SHA384,
|
||||
RSA_PSS_SHA256,
|
||||
ED25519,
|
||||
]
|
||||
}
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
auth::{self, backend::AuthRateLimiter},
|
||||
console::locks::ApiLocks,
|
||||
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
|
||||
rate_limiter::RateBucketInfo,
|
||||
scram::threadpool::ThreadPool,
|
||||
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
|
||||
Host,
|
||||
@@ -580,18 +580,14 @@ impl RetryConfig {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct ConcurrencyLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
#[serde(flatten)]
|
||||
pub limiter: RateLimiterConfig,
|
||||
pub permits: usize,
|
||||
/// Garbage collection epoch
|
||||
#[serde(deserialize_with = "humantime_serde::deserialize")]
|
||||
pub epoch: Duration,
|
||||
/// Lock timeout
|
||||
#[serde(deserialize_with = "humantime_serde::deserialize")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -600,18 +596,13 @@ impl ConcurrencyLockOptions {
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
|
||||
"shards=64,permits=100,epoch=10m,timeout=10ms";
|
||||
"shards=64,permits=10,epoch=10m,timeout=10ms";
|
||||
|
||||
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
|
||||
|
||||
/// Parse lock options passed via cmdline.
|
||||
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let options = options.trim();
|
||||
if options.starts_with('{') && options.ends_with('}') {
|
||||
return Ok(serde_json::from_str(options)?);
|
||||
}
|
||||
|
||||
let mut shards = None;
|
||||
let mut permits = None;
|
||||
let mut epoch = None;
|
||||
@@ -638,13 +629,9 @@ impl ConcurrencyLockOptions {
|
||||
shards = Some(2);
|
||||
}
|
||||
|
||||
let permits = permits.context("missing `permits`")?;
|
||||
let out = Self {
|
||||
shards: shards.context("missing `shards`")?,
|
||||
limiter: RateLimiterConfig {
|
||||
algorithm: RateLimitAlgorithm::Fixed,
|
||||
initial_limit: permits,
|
||||
},
|
||||
permits: permits.context("missing `permits`")?,
|
||||
epoch: epoch.context("missing `epoch`")?,
|
||||
timeout: timeout.context("missing `timeout`")?,
|
||||
};
|
||||
@@ -670,8 +657,6 @@ impl FromStr for ConcurrencyLockOptions {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::rate_limiter::Aimd;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -699,68 +684,36 @@ mod tests {
|
||||
fn test_parse_lock_options() -> anyhow::Result<()> {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
limiter,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(limiter.initial_limit, 4);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
assert_eq!(permits, 4);
|
||||
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
limiter,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(60));
|
||||
assert_eq!(timeout, Duration::from_millis(100));
|
||||
assert_eq!(shards, 16);
|
||||
assert_eq!(limiter.initial_limit, 8);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
assert_eq!(permits, 8);
|
||||
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
limiter,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "permits=0".parse()?;
|
||||
assert_eq!(epoch, Duration::ZERO);
|
||||
assert_eq!(timeout, Duration::ZERO);
|
||||
assert_eq!(shards, 2);
|
||||
assert_eq!(limiter.initial_limit, 0);
|
||||
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_lock_options() -> anyhow::Result<()> {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
} = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
|
||||
.parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(limiter.initial_limit, 44);
|
||||
assert_eq!(
|
||||
limiter.algorithm,
|
||||
RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 5,
|
||||
max: 500,
|
||||
dec: 0.9,
|
||||
inc: 10,
|
||||
utilisation: 0.8
|
||||
}
|
||||
},
|
||||
);
|
||||
assert_eq!(permits, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -15,11 +15,11 @@ use crate::{
|
||||
error::ReportableError,
|
||||
intern::ProjectIdInt,
|
||||
metrics::ApiLockMetrics,
|
||||
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
|
||||
scram, EndpointCacheKey,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use std::{hash::Hash, sync::Arc, time::Duration};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
@@ -443,8 +443,8 @@ impl ApiCaches {
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks<K> {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<K, Arc<DynamicLimiter>>,
|
||||
config: RateLimiterConfig,
|
||||
node_locks: DashMap<K, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
@@ -452,6 +452,8 @@ pub struct ApiLocks<K> {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ApiLockError {
|
||||
#[error("lock was closed")]
|
||||
AcquireError(#[from] tokio::sync::AcquireError),
|
||||
#[error("permit could not be acquired")]
|
||||
TimeoutError(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
@@ -459,6 +461,7 @@ pub enum ApiLockError {
|
||||
impl ReportableError for ApiLockError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service,
|
||||
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
|
||||
}
|
||||
}
|
||||
@@ -467,7 +470,7 @@ impl ReportableError for ApiLockError {
|
||||
impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
config: RateLimiterConfig,
|
||||
permits: usize,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
@@ -476,7 +479,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
config,
|
||||
permits,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
@@ -484,10 +487,8 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
}
|
||||
|
||||
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
|
||||
if self.config.initial_limit == 0 {
|
||||
return Ok(WakeComputePermit {
|
||||
permit: Token::disabled(),
|
||||
});
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
@@ -499,22 +500,24 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.metrics.semaphores_registered.inc();
|
||||
DynamicLimiter::new(self.config)
|
||||
Arc::new(Semaphore::new(self.permits))
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = semaphore.acquire_deadline(now + self.timeout).await;
|
||||
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
|
||||
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
Ok(WakeComputePermit {
|
||||
permit: Some(permit??),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self) {
|
||||
if self.config.initial_limit == 0 {
|
||||
if self.permits == 0 {
|
||||
return;
|
||||
}
|
||||
let mut interval =
|
||||
@@ -544,21 +547,12 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
}
|
||||
|
||||
pub struct WakeComputePermit {
|
||||
permit: Token,
|
||||
// None if the lock is disabled
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub fn should_check_cache(&self) -> bool {
|
||||
!self.permit.is_disabled()
|
||||
}
|
||||
pub fn release(self, outcome: Outcome) {
|
||||
self.permit.release(outcome)
|
||||
}
|
||||
pub fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
|
||||
match res {
|
||||
Ok(_) => self.release(Outcome::Success),
|
||||
Err(_) => self.release(Outcome::Overload),
|
||||
}
|
||||
res
|
||||
self.permit.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +301,7 @@ impl super::Api for Api {
|
||||
}
|
||||
}
|
||||
|
||||
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
|
||||
let mut node = self.do_wake_compute(ctx, user_info).await?;
|
||||
ctx.set_project(node.aux.clone());
|
||||
let cold_start_info = node.aux.cold_start_info;
|
||||
info!("woken up a compute node");
|
||||
|
||||
@@ -84,8 +84,8 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host()?;
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(node_info.connect(ctx, timeout).await)
|
||||
let _permit = self.locks.get_permit(&host).await?;
|
||||
node_info.connect(ctx, timeout).await
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
|
||||
@@ -1,6 +1,2 @@
|
||||
mod limit_algorithm;
|
||||
mod limiter;
|
||||
pub use limit_algorithm::{
|
||||
aimd::Aimd, DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
|
||||
};
|
||||
pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};
|
||||
|
||||
@@ -1,275 +0,0 @@
|
||||
//! Algorithms for controlling concurrency limits.
|
||||
use parking_lot::Mutex;
|
||||
use std::{pin::pin, sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::Notify,
|
||||
time::{error::Elapsed, timeout_at, Instant},
|
||||
};
|
||||
|
||||
use self::aimd::Aimd;
|
||||
|
||||
pub mod aimd;
|
||||
|
||||
/// Whether a job succeeded or failed as a result of congestion/overload.
|
||||
///
|
||||
/// Errors not considered to be caused by overload should be ignored.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum Outcome {
|
||||
/// The job succeeded, or failed in a way unrelated to overload.
|
||||
Success,
|
||||
/// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
|
||||
/// was observed.
|
||||
Overload,
|
||||
}
|
||||
|
||||
/// An algorithm for controlling a concurrency limit.
|
||||
pub trait LimitAlgorithm: Send + Sync + 'static {
|
||||
/// Update the concurrency limit in response to a new job completion.
|
||||
fn update(&self, old_limit: usize, sample: Sample) -> usize;
|
||||
}
|
||||
|
||||
/// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
|
||||
pub struct Sample {
|
||||
pub(crate) latency: Duration,
|
||||
/// Jobs in flight when the sample was taken.
|
||||
pub(crate) in_flight: usize,
|
||||
pub(crate) outcome: Outcome,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RateLimitAlgorithm {
|
||||
#[default]
|
||||
Fixed,
|
||||
Aimd {
|
||||
#[serde(flatten)]
|
||||
conf: Aimd,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct Fixed;
|
||||
|
||||
impl LimitAlgorithm for Fixed {
|
||||
fn update(&self, old_limit: usize, _sample: Sample) -> usize {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
|
||||
pub struct RateLimiterConfig {
|
||||
#[serde(flatten)]
|
||||
pub algorithm: RateLimitAlgorithm,
|
||||
pub initial_limit: usize,
|
||||
}
|
||||
|
||||
impl RateLimiterConfig {
|
||||
pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
|
||||
match self.algorithm {
|
||||
RateLimitAlgorithm::Fixed => Box::new(Fixed),
|
||||
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LimiterInner {
|
||||
alg: Box<dyn LimitAlgorithm>,
|
||||
available: usize,
|
||||
limit: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
impl LimiterInner {
|
||||
fn update(&mut self, latency: Duration, outcome: Option<Outcome>) {
|
||||
if let Some(outcome) = outcome {
|
||||
let sample = Sample {
|
||||
latency,
|
||||
in_flight: self.in_flight,
|
||||
outcome,
|
||||
};
|
||||
self.limit = self.alg.update(self.limit, sample);
|
||||
}
|
||||
}
|
||||
|
||||
fn take(&mut self, ready: &Notify) -> Option<()> {
|
||||
if self.available > 1 {
|
||||
self.available -= 1;
|
||||
self.in_flight += 1;
|
||||
|
||||
// tell the next in the queue that there is a permit ready
|
||||
if self.available > 1 {
|
||||
ready.notify_one();
|
||||
}
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the number of concurrent jobs.
|
||||
///
|
||||
/// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
|
||||
/// token once the job is finished.
|
||||
///
|
||||
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
|
||||
/// caused by overload (loss).
|
||||
pub struct DynamicLimiter {
|
||||
config: RateLimiterConfig,
|
||||
inner: Mutex<LimiterInner>,
|
||||
// to notify when a token is available
|
||||
ready: Notify,
|
||||
}
|
||||
|
||||
/// A concurrency token, required to run a job.
|
||||
///
|
||||
/// Release the token back to the [`DynamicLimiter`] after the job is complete.
|
||||
pub struct Token {
|
||||
start: Instant,
|
||||
limiter: Option<Arc<DynamicLimiter>>,
|
||||
}
|
||||
|
||||
/// A snapshot of the state of the [`DynamicLimiter`].
|
||||
///
|
||||
/// Not guaranteed to be consistent under high concurrency.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct LimiterState {
|
||||
limit: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
impl DynamicLimiter {
|
||||
/// Create a limiter with a given limit control algorithm.
|
||||
pub fn new(config: RateLimiterConfig) -> Arc<Self> {
|
||||
let ready = Notify::new();
|
||||
ready.notify_one();
|
||||
|
||||
Arc::new(Self {
|
||||
inner: Mutex::new(LimiterInner {
|
||||
alg: config.create_rate_limit_algorithm(),
|
||||
available: config.initial_limit,
|
||||
limit: config.initial_limit,
|
||||
in_flight: 0,
|
||||
}),
|
||||
ready,
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
|
||||
///
|
||||
/// Returns `None` if there are none available after `duration`.
|
||||
pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
|
||||
self.acquire_deadline(Instant::now() + duration).await
|
||||
}
|
||||
|
||||
/// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available.
|
||||
///
|
||||
/// Returns `None` if there are none available after `deadline`.
|
||||
pub async fn acquire_deadline(self: &Arc<Self>, deadline: Instant) -> Result<Token, Elapsed> {
|
||||
if self.config.initial_limit == 0 {
|
||||
// If the rate limiter is disabled, we can always acquire a token.
|
||||
Ok(Token::disabled())
|
||||
} else {
|
||||
let mut notified = pin!(self.ready.notified());
|
||||
let mut ready = notified.as_mut().enable();
|
||||
loop {
|
||||
let mut limit = None;
|
||||
if ready {
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.take(&self.ready).is_some() {
|
||||
break Ok(Token::new(self.clone()));
|
||||
}
|
||||
limit = Some(inner.limit);
|
||||
}
|
||||
match timeout_at(deadline, notified.as_mut()).await {
|
||||
Ok(()) => ready = true,
|
||||
Err(e) => {
|
||||
let limit = limit.unwrap_or_else(|| self.inner.lock().limit);
|
||||
tracing::info!(limit, "could not acquire token in time");
|
||||
break Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the concurrency [Token], along with the outcome of the job.
|
||||
///
|
||||
/// The [Outcome] of the job, and the time taken to perform it, may be used
|
||||
/// to update the concurrency limit.
|
||||
///
|
||||
/// Set the outcome to `None` to ignore the job.
|
||||
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
|
||||
tracing::info!("outcome is {:?}", outcome);
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
inner.update(start.elapsed(), outcome);
|
||||
if inner.in_flight < inner.limit {
|
||||
inner.available = inner.limit - inner.in_flight;
|
||||
// At least 1 permit is now available
|
||||
self.ready.notify_one();
|
||||
}
|
||||
|
||||
inner.in_flight -= 1;
|
||||
}
|
||||
|
||||
/// The current state of the limiter.
|
||||
pub fn state(&self) -> LimiterState {
|
||||
let inner = self.inner.lock();
|
||||
LimiterState {
|
||||
limit: inner.limit,
|
||||
in_flight: inner.in_flight,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Token {
|
||||
fn new(limiter: Arc<DynamicLimiter>) -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
limiter: Some(limiter),
|
||||
}
|
||||
}
|
||||
pub fn disabled() -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
limiter: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_disabled(&self) -> bool {
|
||||
self.limiter.is_none()
|
||||
}
|
||||
|
||||
pub fn release(mut self, outcome: Outcome) {
|
||||
self.release_mut(Some(outcome))
|
||||
}
|
||||
|
||||
pub fn release_mut(&mut self, outcome: Option<Outcome>) {
|
||||
if let Some(limiter) = self.limiter.take() {
|
||||
limiter.release_inner(self.start, outcome);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Token {
|
||||
fn drop(&mut self) {
|
||||
self.release_mut(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl LimiterState {
|
||||
/// The current concurrency limit.
|
||||
pub fn limit(&self) -> usize {
|
||||
self.limit
|
||||
}
|
||||
/// The number of jobs in flight.
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.in_flight
|
||||
}
|
||||
}
|
||||
@@ -1,184 +0,0 @@
|
||||
use std::usize;
|
||||
|
||||
use super::{LimitAlgorithm, Outcome, Sample};
|
||||
|
||||
/// Loss-based congestion avoidance.
|
||||
///
|
||||
/// Additive-increase, multiplicative decrease.
|
||||
///
|
||||
/// Adds available currency when:
|
||||
/// 1. no load-based errors are observed, and
|
||||
/// 2. the utilisation of the current limit is high.
|
||||
///
|
||||
/// Reduces available concurrency by a factor when load-based errors are detected.
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
|
||||
pub struct Aimd {
|
||||
/// Minimum limit for AIMD algorithm.
|
||||
pub min: usize,
|
||||
/// Maximum limit for AIMD algorithm.
|
||||
pub max: usize,
|
||||
/// Decrease AIMD decrease by value in case of error.
|
||||
pub dec: f32,
|
||||
/// Increase AIMD increase by value in case of success.
|
||||
pub inc: usize,
|
||||
/// A threshold below which the limit won't be increased.
|
||||
pub utilisation: f32,
|
||||
}
|
||||
|
||||
impl LimitAlgorithm for Aimd {
|
||||
fn update(&self, old_limit: usize, sample: Sample) -> usize {
|
||||
use Outcome::*;
|
||||
match sample.outcome {
|
||||
Success => {
|
||||
let utilisation = sample.in_flight as f32 / old_limit as f32;
|
||||
|
||||
if utilisation > self.utilisation {
|
||||
let limit = old_limit + self.inc;
|
||||
let increased_limit = limit.clamp(self.min, self.max);
|
||||
if increased_limit > old_limit {
|
||||
tracing::info!(increased_limit, "limit increased");
|
||||
}
|
||||
|
||||
increased_limit
|
||||
} else {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
Overload => {
|
||||
let limit = old_limit as f32 * self.dec;
|
||||
|
||||
// Floor instead of round, so the limit reduces even with small numbers.
|
||||
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
|
||||
let limit = limit.floor() as usize;
|
||||
|
||||
limit.clamp(self.min, self.max)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::rate_limiter::limit_algorithm::{
|
||||
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_decrease_limit_on_overload() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 10,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.8,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
token.release(Outcome::Overload);
|
||||
|
||||
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 4,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 1,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
let _token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
let _token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
token.release(Outcome::Success);
|
||||
assert_eq!(limiter.state().limit(), 5, "success: increase");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 4,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
token.release(Outcome::Success);
|
||||
assert_eq!(
|
||||
limiter.state().limit(),
|
||||
4,
|
||||
"success: ignore when < half limit"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn should_not_change_limit_when_no_outcome() {
|
||||
let config = RateLimiterConfig {
|
||||
initial_limit: 10,
|
||||
algorithm: RateLimitAlgorithm::Aimd {
|
||||
conf: Aimd {
|
||||
min: 1,
|
||||
max: 1500,
|
||||
inc: 10,
|
||||
dec: 0.5,
|
||||
utilisation: 0.5,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
let limiter = DynamicLimiter::new(config);
|
||||
|
||||
let token = limiter
|
||||
.acquire_timeout(Duration::from_millis(1))
|
||||
.await
|
||||
.unwrap();
|
||||
drop(token);
|
||||
assert_eq!(limiter.state().limit(), 10, "ignore");
|
||||
}
|
||||
}
|
||||
@@ -232,9 +232,9 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.connect_timeout(timeout);
|
||||
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(tokio_postgres::NoTls).await;
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
drop(pause);
|
||||
let (client, connection) = permit.release_result(res)?;
|
||||
drop(permit);
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
|
||||
@@ -51,10 +51,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let this = self.project();
|
||||
let mut stream = this.stream;
|
||||
this.send.put(buf);
|
||||
|
||||
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
|
||||
|
||||
this.send.put(buf);
|
||||
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
|
||||
Ok(()) => Poll::Ready(Ok(buf.len())),
|
||||
Err(e) => Poll::Ready(Err(io_error(e))),
|
||||
|
||||
@@ -54,7 +54,6 @@ build-backend = "poetry.core.masonry.api"
|
||||
exclude = [
|
||||
"^vendor/",
|
||||
"^target/",
|
||||
"test_runner/performance/pgvector/loaddata.py",
|
||||
]
|
||||
check_untyped_defs = true
|
||||
# Help mypy find imports when running against list of individual files.
|
||||
|
||||
@@ -22,7 +22,8 @@ serde_with.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
utils.workspace = true
|
||||
async-stream.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
@@ -30,9 +31,6 @@ tokio-util = { workspace = true }
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
camino.workspace = true
|
||||
rustls.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
@@ -208,7 +208,7 @@ impl TenantObjectListing {
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
layer_file: &LayerName,
|
||||
metadata: &LayerFileMetadata,
|
||||
metadata: &IndexLayerMetadata,
|
||||
) -> bool {
|
||||
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
|
||||
return false;
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::{collections::HashSet, str::FromStr, sync::Arc};
|
||||
use std::{collections::HashSet, str::FromStr};
|
||||
|
||||
use aws_sdk_s3::Client;
|
||||
use futures::stream::{StreamExt, TryStreamExt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::{XLogFileName, PG_TLI};
|
||||
use serde::Serialize;
|
||||
@@ -71,12 +70,9 @@ pub async fn scan_safekeeper_metadata(
|
||||
"checking bucket {}, region {}, dump_db_table {}",
|
||||
bucket_config.bucket, bucket_config.region, dump_db_table
|
||||
);
|
||||
// Use rustls (Neon requires TLS)
|
||||
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
|
||||
let client_config = rustls::ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
// Use the native TLS implementation (Neon requires TLS)
|
||||
let tls_connector =
|
||||
postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
|
||||
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
@@ -238,11 +234,3 @@ async fn check_timeline(
|
||||
is_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
|
||||
let der_certs = rustls_native_certs::load_native_certs()?;
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
@@ -11,7 +11,7 @@ use async_stream::stream;
|
||||
use aws_sdk_s3::Client;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -49,8 +49,8 @@ impl SnapshotDownloader {
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layer_name: LayerName,
|
||||
layer_metadata: LayerFileMetadata,
|
||||
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
|
||||
layer_metadata: IndexLayerMetadata,
|
||||
) -> anyhow::Result<(LayerName, IndexLayerMetadata)> {
|
||||
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
|
||||
// different layer names (remote-style has the generation suffix)
|
||||
let local_path = self.output_path.join(format!(
|
||||
@@ -110,7 +110,7 @@ impl SnapshotDownloader {
|
||||
async fn download_layers(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
layers: Vec<(LayerName, IndexLayerMetadata)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer_count = layers.len();
|
||||
tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
|
||||
@@ -161,7 +161,10 @@ impl SnapshotDownloader {
|
||||
ttid: TenantShardTimelineId,
|
||||
index_part: Box<IndexPart>,
|
||||
index_part_generation: Generation,
|
||||
ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
|
||||
ancestor_layers: &mut HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerName, IndexLayerMetadata>,
|
||||
>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_bytes = serde_json::to_string(&index_part).unwrap();
|
||||
|
||||
@@ -231,7 +234,7 @@ impl SnapshotDownloader {
|
||||
// happen if this tenant has been split at some point)
|
||||
let mut ancestor_layers: HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerName, LayerFileMetadata>,
|
||||
HashMap<LayerName, IndexLayerMetadata>,
|
||||
> = Default::default();
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
|
||||
@@ -287,26 +287,6 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
/// Force persist control file and remove old WAL.
|
||||
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
|
||||
let tli = GlobalTimelines::get(ttid)?;
|
||||
tli.maybe_persist_control_file(true)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
tli.remove_old_wal()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
@@ -573,10 +553,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|
||||
|r| request_span(r, patch_control_file_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|
||||
|r| request_span(r, timeline_checkpoint_handler),
|
||||
)
|
||||
// for tests
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
|
||||
@@ -11,7 +11,6 @@ use tracing::info;
|
||||
use utils::{
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
pausable_failpoint,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -163,8 +162,6 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
filenames.remove(control_file_index);
|
||||
filenames.insert(0, "safekeeper.control".to_string());
|
||||
|
||||
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
|
||||
|
||||
info!(
|
||||
"downloading {} files from safekeeper {}",
|
||||
filenames.len(),
|
||||
@@ -186,13 +183,6 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
|
||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
||||
let mut response = client.get(&http_url).send().await?;
|
||||
if response.status() != reqwest::StatusCode::OK {
|
||||
bail!(
|
||||
"pulling file {} failed: status is {}",
|
||||
filename,
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
file.write_all(&chunk).await?;
|
||||
file.flush().await?;
|
||||
|
||||
@@ -15,7 +15,7 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
for tli in &tlis {
|
||||
let ttid = tli.ttid;
|
||||
async {
|
||||
if let Err(e) = tli.maybe_persist_control_file(false).await {
|
||||
if let Err(e) = tli.maybe_persist_control_file().await {
|
||||
warn!("failed to persist control file: {e}");
|
||||
}
|
||||
if let Err(e) = tli.remove_old_wal().await {
|
||||
|
||||
@@ -827,9 +827,9 @@ where
|
||||
|
||||
/// Persist control file if there is something to save and enough time
|
||||
/// passed after the last save.
|
||||
pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result<bool> {
|
||||
pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<bool> {
|
||||
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
|
||||
if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
|
||||
if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
|
||||
return Ok(false);
|
||||
}
|
||||
let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn
|
||||
|
||||
@@ -821,9 +821,9 @@ impl Timeline {
|
||||
/// passed after the last save. This helps to keep remote_consistent_lsn up
|
||||
/// to date so that storage nodes restart doesn't cause many pageserver ->
|
||||
/// safekeeper reconnections.
|
||||
pub async fn maybe_persist_control_file(self: &Arc<Self>, force: bool) -> Result<()> {
|
||||
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
|
||||
let mut guard = self.write_shared_state().await;
|
||||
let changed = guard.sk.maybe_persist_inmem_control_file(force).await?;
|
||||
let changed = guard.sk.maybe_persist_inmem_control_file().await?;
|
||||
guard.skip_update = !changed;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ pub async fn main_task(
|
||||
|
||||
if !is_active {
|
||||
// TODO: maybe use tokio::spawn?
|
||||
if let Err(e) = tli.maybe_persist_control_file(false).await {
|
||||
if let Err(e) = tli.maybe_persist_control_file().await {
|
||||
warn!("control file save in update_status failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ from typing import Any, Type, TypeVar, Union
|
||||
|
||||
T = TypeVar("T", bound="Id")
|
||||
|
||||
DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024
|
||||
|
||||
|
||||
@total_ordering
|
||||
class Lsn:
|
||||
@@ -69,9 +67,6 @@ class Lsn:
|
||||
def as_int(self) -> int:
|
||||
return self.lsn_int
|
||||
|
||||
def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn":
|
||||
return Lsn(self.lsn_int - (self.lsn_int % seg_sz))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Key:
|
||||
|
||||
@@ -2667,9 +2667,7 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
|
||||
def list_layers(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> list[Path]:
|
||||
def list_layers(self, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
|
||||
"""
|
||||
Inspect local storage on a pageserver to discover which layer files are present.
|
||||
|
||||
@@ -3771,7 +3769,7 @@ class SafekeeperPort:
|
||||
|
||||
|
||||
@dataclass
|
||||
class Safekeeper(LogUtils):
|
||||
class Safekeeper:
|
||||
"""An object representing a running safekeeper daemon."""
|
||||
|
||||
env: NeonEnv
|
||||
@@ -3779,13 +3777,6 @@ class Safekeeper(LogUtils):
|
||||
id: int
|
||||
running: bool = False
|
||||
|
||||
def __init__(self, env: NeonEnv, port: SafekeeperPort, id: int, running: bool = False):
|
||||
self.env = env
|
||||
self.port = port
|
||||
self.id = id
|
||||
self.running = running
|
||||
self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log"
|
||||
|
||||
def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper":
|
||||
assert self.running is False
|
||||
self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts)
|
||||
@@ -3846,38 +3837,11 @@ class Safekeeper(LogUtils):
|
||||
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
|
||||
)
|
||||
|
||||
def get_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
|
||||
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
|
||||
timeline_start_lsn = timeline_status.timeline_start_lsn
|
||||
log.info(f"sk {self.id} timeline start LSN: {timeline_start_lsn}")
|
||||
return timeline_start_lsn
|
||||
def data_dir(self) -> str:
|
||||
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
|
||||
|
||||
def get_flush_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
|
||||
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
|
||||
flush_lsn = timeline_status.flush_lsn
|
||||
log.info(f"sk {self.id} flush LSN: {flush_lsn}")
|
||||
return flush_lsn
|
||||
|
||||
def pull_timeline(
|
||||
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
pull_timeline from srcs to self.
|
||||
"""
|
||||
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
|
||||
res = self.http_client().pull_timeline(
|
||||
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
|
||||
)
|
||||
src_ids = [sk.id for sk in srcs]
|
||||
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
|
||||
return res
|
||||
|
||||
@property
|
||||
def data_dir(self) -> Path:
|
||||
return self.env.repo_dir / "safekeepers" / f"sk{self.id}"
|
||||
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> str:
|
||||
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
@@ -3890,35 +3854,6 @@ class Safekeeper(LogUtils):
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||
"""
|
||||
Assuming pageserver(s) uploaded to s3 up to `lsn`,
|
||||
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
|
||||
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
|
||||
"""
|
||||
cli = self.http_client()
|
||||
|
||||
def are_lsns_advanced():
|
||||
stat = cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"waiting for remote_consistent_lsn and backup_lsn on sk {self.id} to reach {lsn}, currently remote_consistent_lsn={stat.remote_consistent_lsn}, backup_lsn={stat.backup_lsn}"
|
||||
)
|
||||
assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn()
|
||||
|
||||
# xxx: max wait is long because we might be waiting for reconnection from
|
||||
# pageserver to this safekeeper
|
||||
wait_until(30, 1, are_lsns_advanced)
|
||||
cli.checkpoint(tenant_id, timeline_id)
|
||||
|
||||
def wait_until_paused(self, failpoint: str):
|
||||
msg = f"at failpoint {failpoint}"
|
||||
|
||||
def paused():
|
||||
log.info(f"waiting for hitting failpoint {failpoint}")
|
||||
self.assert_log_contains(msg)
|
||||
|
||||
wait_until(20, 0.5, paused)
|
||||
|
||||
|
||||
class S3Scrubber:
|
||||
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
|
||||
|
||||
@@ -177,13 +177,6 @@ class SafekeeperHttpClient(requests.Session):
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
json={},
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
# only_local doesn't remove segments in the remote storage.
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
|
||||
|
||||
@@ -196,7 +196,7 @@ def query_scalar(cur: cursor, query: str) -> Any:
|
||||
|
||||
|
||||
# Traverse directory to get total size.
|
||||
def get_dir_size(path: Path) -> int:
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
totalbytes = 0
|
||||
for root, _dirs, files in os.walk(path):
|
||||
@@ -560,25 +560,3 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str
|
||||
|
||||
elapsed = time.time() - started_at
|
||||
log.info(f"assert_pageserver_backups_equal completed in {elapsed}s")
|
||||
|
||||
|
||||
class PropagatingThread(threading.Thread):
|
||||
_target: Any
|
||||
_args: Any
|
||||
_kwargs: Any
|
||||
"""
|
||||
Simple Thread wrapper with join() propagating the possible exception in the thread.
|
||||
"""
|
||||
|
||||
def run(self):
|
||||
self.exc = None
|
||||
try:
|
||||
self.ret = self._target(*self._args, **self._kwargs)
|
||||
except BaseException as e:
|
||||
self.exc = e
|
||||
|
||||
def join(self, timeout=None):
|
||||
super(PropagatingThread, self).join(timeout)
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
return self.ret
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
|
||||
\set ECHO queries
|
||||
\timing
|
||||
|
||||
-- prepare test table
|
||||
DROP TABLE IF EXISTS hnsw_test_table;
|
||||
CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;
|
||||
INSERT INTO hnsw_test_table SELECT * FROM documents;
|
||||
CREATE INDEX ON hnsw_test_table (_id); -- needed later for random tuple queries
|
||||
-- tune index build params
|
||||
SET max_parallel_maintenance_workers = 7;
|
||||
SET maintenance_work_mem = '8GB';
|
||||
-- create HNSW index for the supported distance metrics
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);
|
||||
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);
|
||||
-- note: in a second psql session we can monitor the progress of the index build phases using
|
||||
-- the following query:
|
||||
-- SELECT phase, round(100.0 * blocks_done / nullif(blocks_total, 0), 1) AS "%" FROM pg_stat_progress_create_index;
|
||||
|
||||
-- show all indexes built on the table
|
||||
SELECT
|
||||
idx.relname AS index_name,
|
||||
tbl.relname AS table_name,
|
||||
am.amname AS access_method,
|
||||
a.attname AS column_name,
|
||||
opc.opcname AS operator_class
|
||||
FROM
|
||||
pg_index i
|
||||
JOIN
|
||||
pg_class idx ON idx.oid = i.indexrelid
|
||||
JOIN
|
||||
pg_class tbl ON tbl.oid = i.indrelid
|
||||
JOIN
|
||||
pg_am am ON am.oid = idx.relam
|
||||
JOIN
|
||||
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
|
||||
JOIN
|
||||
pg_opclass opc ON opc.oid = i.indclass[0]
|
||||
WHERE
|
||||
tbl.relname = 'hnsw_test_table'
|
||||
AND a.attname = 'embeddings';
|
||||
|
||||
-- show table sizes
|
||||
\dt+
|
||||
@@ -1,52 +0,0 @@
|
||||
|
||||
\set ECHO queries
|
||||
\timing
|
||||
|
||||
-- prepare test table
|
||||
DROP TABLE IF EXISTS ivfflat_test_table;
|
||||
CREATE TABLE ivfflat_test_table AS TABLE documents WITH NO DATA;
|
||||
INSERT INTO ivfflat_test_table SELECT * FROM documents;
|
||||
CREATE INDEX ON ivfflat_test_table (_id); -- needed later for random tuple queries
|
||||
-- tune index build params
|
||||
SET max_parallel_maintenance_workers = 7;
|
||||
SET maintenance_work_mem = '8GB';
|
||||
-- create ivfflat index for the supported distance metrics
|
||||
-- the formulat for lists is # rows / 1000 or sqrt(# rows) if # rows > 1 million
|
||||
-- we have 1 million embeddings of vector size 1536 in column embeddings of table documents
|
||||
-- so we use 1000 lists
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_l2_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_ip_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_cosine_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings::halfvec(1536) halfvec_l2_ops) WITH (lists = 1000);
|
||||
CREATE INDEX ON ivfflat_test_table
|
||||
USING ivfflat ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops) WITH (lists = 1000);
|
||||
|
||||
\d ivfflat_test_table
|
||||
|
||||
|
||||
-- show all indexes built on the table
|
||||
SELECT
|
||||
idx.relname AS index_name,
|
||||
tbl.relname AS table_name,
|
||||
am.amname AS access_method,
|
||||
a.attname AS column_name,
|
||||
opc.opcname AS operator_class
|
||||
FROM
|
||||
pg_index i
|
||||
JOIN
|
||||
pg_class idx ON idx.oid = i.indexrelid
|
||||
JOIN
|
||||
pg_class tbl ON tbl.oid = i.indrelid
|
||||
JOIN
|
||||
pg_am am ON am.oid = idx.relam
|
||||
JOIN
|
||||
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
|
||||
JOIN
|
||||
pg_opclass opc ON opc.oid = i.indclass[0]
|
||||
WHERE
|
||||
tbl.relname = 'ivfflat_test_table'
|
||||
AND a.attname = 'embeddings';
|
||||
-- show table sizes
|
||||
\dt+
|
||||
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
# Source of the dataset for pgvector tests
|
||||
|
||||
This readme was copied from https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
|
||||
|
||||
## Download the parquet files
|
||||
|
||||
```bash
|
||||
brew install git-lfs
|
||||
git-lfs clone https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
|
||||
```
|
||||
|
||||
## Load into postgres:
|
||||
|
||||
see loaddata.py in this directory
|
||||
|
||||
## Rest of dataset card as on huggingface
|
||||
|
||||
---
|
||||
dataset_info:
|
||||
features:
|
||||
- name: _id
|
||||
dtype: string
|
||||
- name: title
|
||||
dtype: string
|
||||
- name: text
|
||||
dtype: string
|
||||
- name: text-embedding-3-large-1536-embedding
|
||||
sequence: float64
|
||||
splits:
|
||||
- name: train
|
||||
num_bytes: 12679725776
|
||||
num_examples: 1000000
|
||||
download_size: 9551862565
|
||||
dataset_size: 12679725776
|
||||
configs:
|
||||
- config_name: default
|
||||
data_files:
|
||||
- split: train
|
||||
path: data/train-*
|
||||
license: mit
|
||||
task_categories:
|
||||
- feature-extraction
|
||||
language:
|
||||
- en
|
||||
size_categories:
|
||||
- 1M<n<10M
|
||||
---
|
||||
|
||||
|
||||
1M OpenAI Embeddings: text-embedding-3-large 1536 dimensions
|
||||
|
||||
- Created: February 2024.
|
||||
- Text used for Embedding: title (string) + text (string)
|
||||
- Embedding Model: OpenAI text-embedding-3-large
|
||||
- This dataset was generated from the first 1M entries of https://huggingface.co/datasets/BeIR/dbpedia-entity, extracted by @KShivendu_
|
||||
@@ -1,72 +0,0 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import psycopg2
|
||||
from pgvector.psycopg2 import register_vector
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
|
||||
def print_usage():
|
||||
print("Usage: loaddata.py <CONNSTR> <DATADIR>")
|
||||
|
||||
|
||||
def main(conn_str, directory_path):
|
||||
# Connection to PostgreSQL
|
||||
with psycopg2.connect(conn_str) as conn:
|
||||
with conn.cursor() as cursor:
|
||||
# Run SQL statements
|
||||
cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;")
|
||||
register_vector(conn)
|
||||
cursor.execute("DROP TABLE IF EXISTS documents;")
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE documents (
|
||||
_id TEXT PRIMARY KEY,
|
||||
title TEXT,
|
||||
text TEXT,
|
||||
embeddings vector(1536) -- text-embedding-3-large-1536-embedding (OpenAI)
|
||||
);
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# List and sort Parquet files
|
||||
parquet_files = sorted(Path(directory_path).glob("*.parquet"))
|
||||
|
||||
for file in parquet_files:
|
||||
print(f"Loading {file} into PostgreSQL")
|
||||
df = pd.read_parquet(file)
|
||||
|
||||
print(df.head())
|
||||
|
||||
data_list = [
|
||||
(
|
||||
row["_id"],
|
||||
row["title"],
|
||||
row["text"],
|
||||
np.array(row["text-embedding-3-large-1536-embedding"]),
|
||||
)
|
||||
for index, row in df.iterrows()
|
||||
]
|
||||
# Use execute_values to perform batch insertion
|
||||
execute_values(
|
||||
cursor,
|
||||
"INSERT INTO documents (_id, title, text, embeddings) VALUES %s",
|
||||
data_list,
|
||||
)
|
||||
# Commit after we insert all embeddings
|
||||
conn.commit()
|
||||
|
||||
print(f"Loaded {file} into PostgreSQL")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print_usage()
|
||||
sys.exit(1)
|
||||
|
||||
conn_str = sys.argv[1]
|
||||
directory_path = sys.argv[2]
|
||||
main(conn_str, directory_path)
|
||||
@@ -1,10 +0,0 @@
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -1,13 +0,0 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -100,25 +100,6 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
# A list of pgvector HNSW index builds to run.
|
||||
# Please do not alter the label for the query, as it is used to identify it.
|
||||
#
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
LabelledQuery("PGV3", r"CREATE INDEX ON hnsw_test_table (_id);"),
|
||||
LabelledQuery("PGV4", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);"),
|
||||
LabelledQuery("PGV5", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);"),
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
|
||||
|
||||
|
||||
@@ -264,18 +245,3 @@ def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Collecting pg_stat_statements")
|
||||
query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("query", PGVECTOR_QUERIES)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgvector_indexing(query: LabelledQuery, remote_compare: RemoteCompare):
|
||||
"""
|
||||
An pgvector test that tests HNSW index build performance and parallelism.
|
||||
|
||||
The DB prepared manually in advance.
|
||||
See
|
||||
- test_runner/performance/pgvector/README.md
|
||||
- test_runner/performance/pgvector/loaddata.py
|
||||
- test_runner/performance/pgvector/HNSW_build.sql
|
||||
"""
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
@@ -17,7 +17,6 @@ class PgBenchLoadType(enum.Enum):
|
||||
INIT = "init"
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -133,26 +132,6 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HNSW:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-hnsw",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -222,13 +201,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)")
|
||||
cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g")
|
||||
cur.execute("delete from gin_test_tbl where id % 2 = 0")
|
||||
cur.execute("vacuum gin_test_tbl")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
@@ -1,282 +0,0 @@
|
||||
from contextlib import closing
|
||||
from typing import Set
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonPageserver
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from psycopg2.errors import QueryCanceled
|
||||
|
||||
CRITICAL_PG_PS_WAIT_FAILPOINTS: Set[str] = {
|
||||
"ps::connection-start::pre-login",
|
||||
"ps::connection-start::startup-packet",
|
||||
"ps::connection-start::process-query",
|
||||
"ps::handle-pagerequest-message::exists",
|
||||
"ps::handle-pagerequest-message::nblocks",
|
||||
"ps::handle-pagerequest-message::getpage",
|
||||
"ps::handle-pagerequest-message::dbsize",
|
||||
# We don't yet have a good way to on-demand guarantee the download of an
|
||||
# SLRU segment, so that's disabled for now.
|
||||
# "ps::handle-pagerequest-message::slrusegment",
|
||||
}
|
||||
|
||||
PG_PS_START_FAILPOINTS = {
|
||||
"ps::connection-start::pre-login",
|
||||
"ps::connection-start::startup-packet",
|
||||
"ps::connection-start::process-query",
|
||||
}
|
||||
SMGR_EXISTS = "ps::handle-pagerequest-message::exists"
|
||||
SMGR_NBLOCKS = "ps::handle-pagerequest-message::nblocks"
|
||||
SMGR_GETPAGE = "ps::handle-pagerequest-message::getpage"
|
||||
SMGR_DBSIZE = "ps::handle-pagerequest-message::dbsize"
|
||||
|
||||
"""
|
||||
Test that we can handle connection delays and cancellations at various
|
||||
unfortunate connection startup and request states.
|
||||
"""
|
||||
|
||||
|
||||
def test_cancellations(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
ps = env.pageserver
|
||||
ps_http = ps.http_client()
|
||||
ps_http.is_testing_enabled_or_skip()
|
||||
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
# We don't want to have any racy behaviour with autovacuum IOs
|
||||
ep = env.endpoints.create_start(
|
||||
"test_config",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers = 128MB",
|
||||
],
|
||||
)
|
||||
|
||||
with closing(ep.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE test1 AS
|
||||
SELECT id, sha256(id::text::bytea) payload
|
||||
FROM generate_series(1, 1024::bigint) p(id);
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE test2 AS
|
||||
SELECT id, sha256(id::text::bytea) payload
|
||||
FROM generate_series(1025, 2048::bigint) p(id);
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
VACUUM (ANALYZE, FREEZE) test1, test2;
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE EXTENSION pg_buffercache;
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE EXTENSION pg_prewarm;
|
||||
"""
|
||||
)
|
||||
|
||||
# data preparation is now complete, with 2 disjoint tables that aren't
|
||||
# preloaded into any caches.
|
||||
|
||||
ep.stop()
|
||||
|
||||
for failpoint in CRITICAL_PG_PS_WAIT_FAILPOINTS:
|
||||
connect_works_correctly(failpoint, ep, ps, ps_http)
|
||||
|
||||
|
||||
ENABLED_FAILPOINTS: Set[str] = set()
|
||||
|
||||
|
||||
def connect_works_correctly(
|
||||
failpoint: str, ep: Endpoint, ps: NeonPageserver, ps_http: PageserverHttpClient
|
||||
):
|
||||
log.debug("Starting work on %s", failpoint)
|
||||
# All queries we use should finish (incl. IO) within 500ms,
|
||||
# including all their IO.
|
||||
# This allows us to use `SET statement_timeout` to let the query
|
||||
# timeout system cancel queries, rather than us having to go
|
||||
# through the most annoying effort of manual query cancellation
|
||||
# in psycopg2.
|
||||
options = "-cstatement_timeout=500ms -ceffective_io_concurrency=1"
|
||||
|
||||
ep.start()
|
||||
|
||||
def fp_enable():
|
||||
global ENABLED_FAILPOINTS
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
(failpoint, "pause"),
|
||||
]
|
||||
)
|
||||
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS | {failpoint}
|
||||
log.info(
|
||||
'Enabled failpoint "%s", current_active=%s', failpoint, ENABLED_FAILPOINTS, stacklevel=2
|
||||
)
|
||||
|
||||
def fp_disable():
|
||||
global ENABLED_FAILPOINTS
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
(failpoint, "off"),
|
||||
]
|
||||
)
|
||||
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS - {failpoint}
|
||||
log.info(
|
||||
'Disabled failpoint "%s", current_active=%s',
|
||||
failpoint,
|
||||
ENABLED_FAILPOINTS,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
def check_buffers(cur):
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT n.nspname AS nspname
|
||||
, c.relname AS relname
|
||||
, count(*) AS count
|
||||
FROM pg_buffercache b
|
||||
JOIN pg_class c
|
||||
ON b.relfilenode = pg_relation_filenode(c.oid) AND
|
||||
b.reldatabase = (SELECT oid FROM pg_database WHERE datname = current_database())
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE c.oid IN ('test1'::regclass::oid, 'test2'::regclass::oid)
|
||||
GROUP BY n.nspname, c.relname
|
||||
ORDER BY 3 DESC
|
||||
LIMIT 10
|
||||
"""
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
def exec_may_cancel(query, cursor, result, cancels):
|
||||
if cancels:
|
||||
with pytest.raises(QueryCanceled):
|
||||
cursor.execute(query)
|
||||
assert cursor.fetchone() == result
|
||||
else:
|
||||
cursor.execute(query)
|
||||
assert cursor.fetchone() == result
|
||||
|
||||
fp_disable()
|
||||
|
||||
# Warm caches required for new connections, so that they can run without
|
||||
# requiring catalog reads.
|
||||
with closing(ep.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT 1;
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == (1,)
|
||||
|
||||
assert check_buffers(cur) is None
|
||||
# Ensure all caches required for connection start are correctly
|
||||
# filled, so that we don't have any "accidents" in this test run
|
||||
# caused by changes in connection startup plans that require
|
||||
# requests to the PageServer.
|
||||
cur.execute(
|
||||
"""
|
||||
select array_agg(distinct (pg_prewarm(c.oid::regclass, 'buffer') >= 0))
|
||||
from pg_class c
|
||||
where c.oid < 16384 AND c.relkind IN ('i', 'r');
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == ([True],)
|
||||
|
||||
# Enable failpoint
|
||||
fp_enable()
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW statement_timeout;")
|
||||
assert cur.fetchone() == ("500ms",)
|
||||
assert check_buffers(cur) is None
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT min(id) FROM test1;
|
||||
""",
|
||||
cur,
|
||||
(1,),
|
||||
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Do a select on the data, putting some buffers into the prefetch
|
||||
# queue.
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id) FROM (select * from test1 LIMIT 256) a;
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == (256,)
|
||||
|
||||
ps.stop()
|
||||
ps.start()
|
||||
fp_enable()
|
||||
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT COUNT(id) FROM test1;
|
||||
""",
|
||||
cur,
|
||||
(1024,),
|
||||
failpoint
|
||||
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_NBLOCKS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT COUNT(id) FROM test2;
|
||||
""",
|
||||
cur,
|
||||
(1024,),
|
||||
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
fp_enable()
|
||||
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT 0 < pg_database_size(CURRENT_DATABASE());
|
||||
""",
|
||||
cur,
|
||||
(True,),
|
||||
failpoint
|
||||
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_GETPAGE, SMGR_NBLOCKS}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test2;
|
||||
"""
|
||||
)
|
||||
|
||||
assert cur.fetchone() == (1024, 1024, 1025, 2048, 1573376)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test1;
|
||||
"""
|
||||
)
|
||||
|
||||
assert cur.fetchone() == (1024, 1024, 1, 1024, 524800)
|
||||
|
||||
ep.stop()
|
||||
@@ -177,16 +177,7 @@ def test_sharding_split_unsharded(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failpoint",
|
||||
[
|
||||
None,
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
"compact-shard-ancestors-persistent",
|
||||
],
|
||||
)
|
||||
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
|
||||
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that after a split, we clean up parent layer data in the child shards via compaction.
|
||||
"""
|
||||
@@ -205,11 +196,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
|
||||
"max_unavailable": "300s"
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -227,10 +213,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
# Split one shard into two
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Let all shards move into their stable locations, so that during subsequent steps we
|
||||
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
@@ -255,90 +237,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload.validate()
|
||||
|
||||
# Force a generation increase: layer rewrites are a long-term thing and only happen after
|
||||
# the generation has increased.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Cleanup part 2: once layers are outside the PITR window, they will be rewritten if they are partially redundant
|
||||
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, {"pitr_interval": "0s"})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
# Apply failpoints for the layer-rewriting phase: this is the area of code that has sensitive behavior
|
||||
# across restarts, as we will have local layer files that temporarily disagree with the remote metadata
|
||||
# for the same local layer file name.
|
||||
if failpoint is not None:
|
||||
ps.http_client().configure_failpoints((failpoint, "exit"))
|
||||
|
||||
# Do a GC to update gc_info (compaction uses this to decide whether a layer is to be rewritten)
|
||||
# Set gc_horizon=0 to let PITR horizon control GC cutoff exclusively.
|
||||
ps.http_client().timeline_gc(shard, timeline_id, gc_horizon=0)
|
||||
|
||||
# We will compare stats before + after compaction
|
||||
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
|
||||
# Invoke compaction: this should rewrite layers that are behind the pitr horizon
|
||||
try:
|
||||
ps.http_client().timeline_compact(shard, timeline_id)
|
||||
except requests.ConnectionError as e:
|
||||
if failpoint is None:
|
||||
raise e
|
||||
else:
|
||||
log.info(f"Compaction failed (failpoint={failpoint}): {e}")
|
||||
|
||||
if failpoint in (
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
):
|
||||
# If we left local files that don't match remote metadata, we expect warnings on next startup
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*removing local file .+ because it has unexpected length.*"
|
||||
)
|
||||
|
||||
# Post-failpoint: we check that the pageserver comes back online happily.
|
||||
env.pageserver.running = False
|
||||
env.pageserver.start()
|
||||
else:
|
||||
assert failpoint is None # We shouldn't reach success path if a failpoint was set
|
||||
|
||||
detail_after = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
|
||||
# Physical size should shrink because layers are smaller
|
||||
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
|
||||
|
||||
# Validate size statistics
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
reported_size = timeline_info["current_physical_size"]
|
||||
layer_paths = ps.list_layers(shard, timeline_id)
|
||||
measured_size = 0
|
||||
for p in layer_paths:
|
||||
abs_path = ps.timeline_dir(shard, timeline_id) / p
|
||||
measured_size += os.stat(abs_path).st_size
|
||||
|
||||
log.info(
|
||||
f"shard {shard} reported size {reported_size}, measured size {measured_size} ({len(layer_paths)} layers)"
|
||||
)
|
||||
|
||||
if failpoint in (
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
):
|
||||
# If we injected a failure between local rewrite and remote upload, then after
|
||||
# restart we may end up with neither version of the file on local disk (the new file
|
||||
# is cleaned up because it doesn't matchc remote metadata). So local size isn't
|
||||
# necessarily going to match remote physical size.
|
||||
continue
|
||||
|
||||
assert measured_size == reported_size
|
||||
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
@@ -23,6 +23,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
@@ -47,7 +48,7 @@ from fixtures.remote_storage import (
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
|
||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint(
|
||||
@@ -359,7 +360,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
# We will wait for first segment removal. Make sure they exist for starter.
|
||||
first_segments = [
|
||||
sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
|
||||
os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001")
|
||||
for sk in env.safekeepers
|
||||
]
|
||||
assert all(os.path.exists(p) for p in first_segments)
|
||||
@@ -444,7 +445,7 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim
|
||||
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id))
|
||||
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
|
||||
sk_wal_size_mb = sk_wal_size / 1024 / 1024
|
||||
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
|
||||
return sk_wal_size_mb <= target_size_mb
|
||||
@@ -590,10 +591,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# save the last (partial) file to put it back after recreation; others will be fetched from s3
|
||||
sk = env.safekeepers[0]
|
||||
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
|
||||
tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id)
|
||||
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
|
||||
f_partial_path = tli_dir / f_partial
|
||||
f_partial_saved = Path(sk.data_dir) / f_partial.name
|
||||
f_partial_saved = Path(sk.data_dir()) / f_partial.name
|
||||
f_partial_path.rename(f_partial_saved)
|
||||
|
||||
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
|
||||
@@ -615,7 +616,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
cli = sk.http_client()
|
||||
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
|
||||
f_partial_path = (
|
||||
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
|
||||
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
|
||||
)
|
||||
shutil.copy(f_partial_saved, f_partial_path)
|
||||
|
||||
@@ -1131,8 +1132,8 @@ def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: Timeline
|
||||
)
|
||||
|
||||
for f in mismatch:
|
||||
f1 = sk0.timeline_dir(tenant_id, timeline_id) / f
|
||||
f2 = sk.timeline_dir(tenant_id, timeline_id) / f
|
||||
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = f"{f2}.filediff"
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
@@ -1630,7 +1631,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key)")
|
||||
sk = env.safekeepers[0]
|
||||
sk_data_dir = sk.data_dir
|
||||
sk_data_dir = Path(sk.data_dir())
|
||||
if not auth_enabled:
|
||||
sk_http = sk.http_client()
|
||||
sk_http_other = sk_http
|
||||
@@ -1723,6 +1724,9 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
|
||||
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -1808,65 +1812,6 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
|
||||
# Test pull_timeline while concurrently gc'ing WAL on safekeeper:
|
||||
# 1) Start pull_timeline, listing files to fetch.
|
||||
# 2) Write segment, do gc.
|
||||
# 3) Finish pull_timeline.
|
||||
# 4) Do some write, verify integrity with timeline_digest.
|
||||
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
|
||||
# segment is not implemented.
|
||||
@pytest.mark.xfail
|
||||
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
|
||||
|
||||
log.info("use only first 2 safekeepers, 3rd will be seeded")
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.active_safekeepers = [1, 2]
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
|
||||
|
||||
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
|
||||
|
||||
dst_http = dst_sk.http_client()
|
||||
# run pull_timeline which will halt before downloading files
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
|
||||
pt_handle = PropagatingThread(
|
||||
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
|
||||
)
|
||||
pt_handle.start()
|
||||
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
|
||||
|
||||
# ensure segment exists
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
|
||||
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
assert lsn > Lsn("0/2000000")
|
||||
# Checkpoint timeline beyond lsn.
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
|
||||
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
|
||||
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
|
||||
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
|
||||
pt_handle.join()
|
||||
|
||||
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
|
||||
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
|
||||
assert dst_flush_lsn >= src_flush_lsn
|
||||
digests = [
|
||||
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn)
|
||||
for sk in [src_sk, dst_sk]
|
||||
]
|
||||
assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}"
|
||||
|
||||
|
||||
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
|
||||
# when compute is active, but there are no writes to the timeline. In that case
|
||||
# pageserver should maintain a single connection to safekeeper and don't attempt
|
||||
|
||||
@@ -531,64 +531,6 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
async def run_wal_truncation(env: NeonEnv):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(sk1, sk2, sk3) = env.safekeepers
|
||||
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table t (key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
# stop sk3 as well
|
||||
sk3.stop()
|
||||
|
||||
# now start sk1 and sk2 and make them commit something
|
||||
sk1.start()
|
||||
sk2.start()
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
)
|
||||
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
|
||||
|
||||
# start sk3 and wait for it to catch up
|
||||
sk3.start()
|
||||
flush_lsn = Lsn(ep.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()"))
|
||||
await wait_for_lsn(sk3, tenant_id, timeline_id, flush_lsn)
|
||||
|
||||
timeline_start_lsn = sk1.get_timeline_start_lsn(tenant_id, timeline_id)
|
||||
digests = [
|
||||
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, flush_lsn)
|
||||
for sk in [sk1, sk2]
|
||||
]
|
||||
assert digests[0] == digests[1], f"digest on sk1 is {digests[0]} but on sk3 is {digests[1]}"
|
||||
|
||||
|
||||
# Simple deterministic test creating tail of WAL on safekeeper which is
|
||||
# truncated when majority without this sk elects walproposer starting earlier.
|
||||
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_wal_truncation(env))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_segment_init_failure")
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 17e0f5ff4e...0d30e28f74
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: c2c3d40534...74fb144890
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b228f20372...3c2b9d576c
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
|
||||
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
|
||||
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
|
||||
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
|
||||
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
|
||||
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ files:
|
||||
|
||||
- metric_name: pg_stats_userdb
|
||||
type: gauge
|
||||
help: 'Stats for several oldest non-system dbs'
|
||||
help: 'Stats for the oldest non-system db'
|
||||
key_labels:
|
||||
- datname
|
||||
value_label: kind
|
||||
@@ -205,8 +205,9 @@ files:
|
||||
- inserted
|
||||
- updated
|
||||
- deleted
|
||||
# We export stats for 10 non-system database. Without this limit
|
||||
# We export stats for only one non-system database. Without this limit
|
||||
# it is too easy to abuse the system by creating lots of databases.
|
||||
# We can try lifting this limit in the future after we understand the needs better.
|
||||
query: |
|
||||
select pg_database_size(datname) as db_size, deadlocks,
|
||||
tup_inserted as inserted, tup_updated as updated, tup_deleted as deleted,
|
||||
@@ -217,7 +218,7 @@ files:
|
||||
from pg_database
|
||||
where datname <> 'postgres' and not datistemplate
|
||||
order by oid
|
||||
limit 10
|
||||
limit 1
|
||||
);
|
||||
|
||||
- metric_name: max_cluster_size
|
||||
@@ -319,7 +320,7 @@ files:
|
||||
|
||||
- metric_name: wal_is_lost
|
||||
type: gauge
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
help: 'Whether or not the replication slot\'s wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
|
||||
@@ -59,7 +59,7 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
@@ -68,7 +68,7 @@ sha2 = { version = "0.10", features = ["asm"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-rustls = { version = "0.24" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
|
||||
Reference in New Issue
Block a user