Compare commits

..

10 Commits

Author SHA1 Message Date
Alex Chi Z
499105da6d passthrough wait_for_upload, better upload scheduling
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-21 14:11:46 -05:00
Alex Chi Z
95474cfbe0 do not wait for checkpoint in emergency mode
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-21 13:34:01 -05:00
Alex Chi Z
b501f1a681 Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/fix-barrier 2024-11-21 12:53:42 -05:00
Alex Chi Z
d710f000ef fix test_emergency_mode
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-20 16:54:58 -05:00
Alex Chi Z
feaeba3750 Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/fix-barrier 2024-11-20 16:38:44 -05:00
Alex Chi Z
2c4829c2bf fix assertions
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-20 16:34:25 -05:00
Alex Chi Z
cdde254c84 fix type check
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-19 11:58:06 -05:00
Alex Chi Z
a9db766c20 fix local fs semantics due to race deletion/download
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-19 11:56:42 -05:00
Alex Chi Z
42ac6f6377 fix the issue
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-19 11:56:41 -05:00
Alex Chi Z
45f6111ad9 create the test case to reproduce the issue
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-19 11:56:41 -05:00
108 changed files with 750 additions and 4591 deletions

View File

@@ -19,8 +19,8 @@ on:
description: 'debug or release'
required: true
type: string
test-cfg:
description: 'a json object of postgres versions and lfc states to run regression tests on'
pg-versions:
description: 'a json array of postgres versions to run regression tests on'
required: true
type: string
@@ -276,14 +276,14 @@ jobs:
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
fail-fast: false
matrix: ${{ fromJSON(format('{{"include":{0}}}', inputs.test-cfg)) }}
matrix:
pg_version: ${{ fromJson(inputs.pg-versions) }}
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: 60
with:
@@ -300,7 +300,6 @@ jobs:
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -558,12 +558,12 @@ jobs:
arch=$(uname -m | sed 's/x86_64/amd64/g' | sed 's/aarch64/arm64/g')
cd /home/nonroot
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-17/libpq5_17.2-1.pgdg110+1_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.6-1.pgdg110+1_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.6-1.pgdg110+1_${arch}.deb"
dpkg -x libpq5_17.2-1.pgdg110+1_${arch}.deb pg
dpkg -x postgresql-16_16.6-1.pgdg110+1_${arch}.deb pg
dpkg -x postgresql-client-16_16.6-1.pgdg110+1_${arch}.deb pg
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-17/libpq5_17.1-1.pgdg110+1_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.5-1.pgdg110+1_${arch}.deb"
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.5-1.pgdg110+1_${arch}.deb"
dpkg -x libpq5_17.1-1.pgdg110+1_${arch}.deb pg
dpkg -x postgresql-16_16.5-1.pgdg110+1_${arch}.deb pg
dpkg -x postgresql-client-16_16.5-1.pgdg110+1_${arch}.deb pg
mkdir -p /tmp/neon/pg_install/v16/bin
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v16/bin/pgbench

View File

@@ -253,14 +253,7 @@ jobs:
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
# run without LFC on v17 release only
test-cfg: |
${{ matrix.build-type == 'release' && '[{"pg_version":"v14", "lfc_state": "without-lfc"},
{"pg_version":"v15", "lfc_state": "without-lfc"},
{"pg_version":"v16", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "with-lfc"}]'
|| '[{"pg_version":"v17", "lfc_state": "without-lfc"}]' }}
pg-versions: ${{ matrix.build-type == 'release' && '["v14", "v15", "v16", "v17"]' || '["v17"]' }}
secrets: inherit
# Keep `benchmarks` job outside of `build-and-test-locally` workflow to make job failures non-blocking

View File

@@ -4,12 +4,10 @@ on:
schedule:
- cron: '*/15 * * * *'
- cron: '25 0 * * *'
- cron: '25 1 * * 6'
jobs:
gh-workflow-stats-batch-2h:
name: GitHub Workflow Stats Batch 2 hours
if: github.event.schedule == '*/15 * * * *'
gh-workflow-stats-batch:
name: GitHub Workflow Stats Batch
runs-on: ubuntu-22.04
permissions:
actions: read
@@ -18,36 +16,14 @@ jobs:
uses: neondatabase/gh-workflow-stats-action@v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
db_table: "gh_workflow_stats_batch_neon"
gh_token: ${{ secrets.GITHUB_TOKEN }}
duration: '2h'
gh-workflow-stats-batch-48h:
name: GitHub Workflow Stats Batch 48 hours
if: github.event.schedule == '25 0 * * *'
runs-on: ubuntu-22.04
permissions:
actions: read
steps:
- name: Export Workflow Run for the past 48 hours
- name: Export Workflow Run for the past 24 hours
if: github.event.schedule == '25 0 * * *'
uses: neondatabase/gh-workflow-stats-action@v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
db_table: "gh_workflow_stats_batch_neon"
gh_token: ${{ secrets.GITHUB_TOKEN }}
duration: '48h'
gh-workflow-stats-batch-30d:
name: GitHub Workflow Stats Batch 30 days
if: github.event.schedule == '25 1 * * 6'
runs-on: ubuntu-22.04
permissions:
actions: read
steps:
- name: Export Workflow Run for the past 30 days
uses: neondatabase/gh-workflow-stats-action@v0.2.1
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
gh_token: ${{ secrets.GITHUB_TOKEN }}
duration: '720h'
duration: '24h'

View File

@@ -0,0 +1,41 @@
name: Report Workflow Stats
on:
workflow_run:
workflows:
- Add `external` label to issues and PRs created by external users
- Benchmarking
- Build and Test
- Build and Test Locally
- Build build-tools image
- Check Permissions
- Check neon with extra platform builds
- Cloud Regression Test
- Create Release Branch
- Handle `approved-for-ci-run` label
- Lint GitHub Workflows
- Notify Slack channel about upcoming release
- Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
- Pin build-tools image
- Prepare benchmarking databases by restoring dumps
- Push images to ACR
- Test Postgres client libraries
- Trigger E2E Tests
- cleanup caches by a branch
- Pre-merge checks
types: [completed]
jobs:
gh-workflow-stats:
name: Github Workflow Stats
runs-on: ubuntu-22.04
permissions:
actions: read
steps:
- name: Export GH Workflow Stats
uses: neondatabase/gh-workflow-stats-action@v0.1.4
with:
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
DB_TABLE: "gh_workflow_stats_neon"
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GH_RUN_ID: ${{ github.event.workflow_run.id }}

301
Cargo.lock generated
View File

@@ -46,15 +46,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "aligned-vec"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e0966165eaf052580bd70eb1b32cb3d6245774c0104d1b2793e9650bf83b52a"
dependencies = [
"equator",
]
[[package]]
name = "allocator-api2"
version = "0.2.16"
@@ -155,12 +146,6 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "asn1-rs"
version = "0.6.2"
@@ -374,28 +359,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "aws-sdk-kms"
version = "1.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-s3"
version = "1.52.0"
@@ -612,9 +575,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.7.2"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db"
checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -779,7 +742,7 @@ dependencies = [
"once_cell",
"paste",
"pin-project",
"quick-xml 0.31.0",
"quick-xml",
"rand 0.8.5",
"reqwest 0.11.19",
"rustc_version",
@@ -1257,10 +1220,6 @@ name = "compute_tools"
version = "0.1.0"
dependencies = [
"anyhow",
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"base64 0.13.1",
"bytes",
"camino",
"cfg-if",
@@ -1278,16 +1237,13 @@ dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"postgres",
"postgres_initdb",
"prometheus",
"regex",
"remote_storage",
"reqwest 0.12.4",
"rlimit",
"rust-ini",
"serde",
"serde_json",
"serde_with",
"signal-hook",
"tar",
"thiserror",
@@ -1425,15 +1381,6 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]]
name = "cpp_demangle"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
dependencies = [
"cfg-if",
]
[[package]]
name = "cpufeatures"
version = "0.2.9"
@@ -1957,26 +1904,6 @@ dependencies = [
"termcolor",
]
[[package]]
name = "equator"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c35da53b5a021d2484a7cc49b2ac7f2d840f8236a286f84202369bd338d761ea"
dependencies = [
"equator-macro",
]
[[package]]
name = "equator-macro"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@@ -2084,18 +2011,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -2174,9 +2089,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@@ -2184,9 +2099,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
@@ -2201,9 +2116,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
@@ -2222,9 +2137,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
@@ -2233,15 +2148,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-timer"
@@ -2251,9 +2166,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
@@ -2799,24 +2714,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
[[package]]
name = "inferno"
version = "0.11.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap 2.0.1",
"is-terminal",
"itoa",
"log",
"num-format",
"once_cell",
"quick-xml 0.26.0",
"rgb",
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
@@ -2867,9 +2764,9 @@ dependencies = [
[[package]]
name = "ipnet"
version = "2.10.1"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
@@ -3156,15 +3053,6 @@ version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "memmap2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.7.1"
@@ -3390,16 +3278,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-format"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -3741,7 +3619,6 @@ dependencies = [
"num_cpus",
"once_cell",
"pageserver_api",
"pageserver_client",
"pageserver_compaction",
"pin-project-lite",
"postgres",
@@ -3750,7 +3627,6 @@ dependencies = [
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"postgres_initdb",
"pq_proto",
"procfs",
"rand 0.8.5",
@@ -4182,7 +4058,7 @@ dependencies = [
"bytes",
"once_cell",
"pq_proto",
"rustls 0.23.18",
"rustls 0.23.16",
"rustls-pemfile 2.1.1",
"serde",
"thiserror",
@@ -4226,48 +4102,12 @@ dependencies = [
"utils",
]
[[package]]
name = "postgres_initdb"
version = "0.1.0"
dependencies = [
"anyhow",
"camino",
"thiserror",
"tokio",
"workspace_hack",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"criterion",
"findshlibs",
"inferno",
"libc",
"log",
"nix 0.26.4",
"once_cell",
"parking_lot 0.12.1",
"protobuf",
"protobuf-codegen-pure",
"smallvec",
"symbolic-demangle",
"tempfile",
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -4420,31 +4260,6 @@ dependencies = [
"prost",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "protobuf-codegen"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6"
dependencies = [
"protobuf",
]
[[package]]
name = "protobuf-codegen-pure"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865"
dependencies = [
"protobuf",
"protobuf-codegen",
]
[[package]]
name = "proxy"
version = "0.1.0"
@@ -4518,7 +4333,7 @@ dependencies = [
"rsa",
"rstest",
"rustc-hash",
"rustls 0.23.18",
"rustls 0.23.16",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"scopeguard",
@@ -4556,15 +4371,6 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.31.0"
@@ -4861,7 +4667,6 @@ dependencies = [
"once_cell",
"pin-project-lite",
"rand 0.8.5",
"reqwest 0.11.19",
"scopeguard",
"serde",
"serde_json",
@@ -5048,15 +4853,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "rgb"
version = "0.8.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.17.6"
@@ -5232,9 +5028,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.18"
version = "0.23.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f"
checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e"
dependencies = [
"log",
"once_cell",
@@ -5370,7 +5166,6 @@ dependencies = [
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -5917,12 +5712,6 @@ dependencies = [
"der 0.7.8",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -5949,7 +5738,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"prost",
"rustls 0.23.18",
"rustls 0.23.16",
"tokio",
"tonic",
"tonic-build",
@@ -6032,7 +5821,7 @@ dependencies = [
"postgres_ffi",
"remote_storage",
"reqwest 0.12.4",
"rustls 0.23.18",
"rustls 0.23.16",
"rustls-native-certs 0.8.0",
"serde",
"serde_json",
@@ -6069,12 +5858,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "stringprep"
version = "0.1.2"
@@ -6122,29 +5905,6 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
[[package]]
name = "symbolic-common"
version = "12.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "366f1b4c6baf6cfefc234bbd4899535fca0b06c74443039a73f6dfb2fad88d77"
dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "12.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aba05ba5b9962ea5617baf556293720a8b2d0a282aa14ee4bf10e22efc7da8c8"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.109"
@@ -6494,7 +6254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab"
dependencies = [
"ring",
"rustls 0.23.18",
"rustls 0.23.16",
"tokio",
"tokio-postgres",
"tokio-rustls 0.26.0",
@@ -6528,7 +6288,7 @@ version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.18",
"rustls 0.23.16",
"rustls-pki-types",
"tokio",
]
@@ -6937,7 +6697,7 @@ dependencies = [
"base64 0.22.1",
"log",
"once_cell",
"rustls 0.23.18",
"rustls 0.23.16",
"rustls-pki-types",
"url",
"webpki-roots 0.26.1",
@@ -7012,7 +6772,6 @@ dependencies = [
"once_cell",
"pin-project-lite",
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -7547,7 +7306,6 @@ dependencies = [
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.1",
"base64ct",
"bytes",
@@ -7582,7 +7340,6 @@ dependencies = [
"libc",
"log",
"memchr",
"nix 0.26.4",
"nom",
"num-bigint",
"num-integer",
@@ -7599,7 +7356,7 @@ dependencies = [
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest 0.12.4",
"rustls 0.23.18",
"rustls 0.23.16",
"scopeguard",
"serde",
"serde_json",

View File

@@ -34,7 +34,6 @@ members = [
"libs/vm_monitor",
"libs/walproposer",
"libs/wal_decoder",
"libs/postgres_initdb",
]
[workspace.package]
@@ -58,7 +57,6 @@ async-trait = "0.1"
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
aws-sdk-s3 = "1.52"
aws-sdk-iam = "1.46.0"
aws-sdk-kms = "1.47.0"
aws-smithy-async = { version = "1.2.1", default-features = false, features=["rt-tokio"] }
aws-smithy-types = "1.2"
aws-credential-types = "1.2.0"
@@ -75,7 +73,7 @@ bytes = "1.0"
camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive", "env"] }
clap = { version = "4.0", features = ["derive"] }
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
@@ -108,7 +106,7 @@ hyper-util = "0.1"
tokio-tungstenite = "0.21.0"
indexmap = "2"
indoc = "2"
ipnet = "2.10.0"
ipnet = "2.9.0"
itertools = "0.10"
itoa = "1.0.11"
jsonwebtoken = "9"
@@ -132,7 +130,6 @@ parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "protobuf", "protobuf-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13"
@@ -156,7 +153,7 @@ sentry = { version = "0.32", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "2.0", features = [ "base64" ] }
serde_with = "2.0"
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"
@@ -215,14 +212,12 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", br
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }

View File

@@ -1243,7 +1243,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
@@ -1264,7 +1264,6 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import
#########################################################################################
#
@@ -1459,7 +1458,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import
# pgbouncer and its config
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
@@ -1535,25 +1533,6 @@ RUN apt update && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
# used by fast_import
ARG TARGETARCH
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
RUN set -ex; \
\
# Determine the expected checksum based on TARGETARCH
if [ "${TARGETARCH}" = "amd64" ]; then \
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
\
# Compute and validate the checksum
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
ENV LANG=en_US.utf8
USER postgres
ENTRYPOINT ["/usr/local/bin/compute_ctl"]

View File

@@ -10,10 +10,6 @@ default = []
testing = []
[dependencies]
base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-sdk-kms.workspace = true
anyhow.workspace = true
camino.workspace = true
chrono.workspace = true
@@ -31,8 +27,6 @@ opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
postgres.workspace = true
regex.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
@@ -49,7 +43,6 @@ thiserror.workspace = true
url.workspace = true
prometheus.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -1,338 +0,0 @@
//! This program dumps a remote Postgres database into a local Postgres database
//! and uploads the resulting PGDATA into object storage for import into a Timeline.
//!
//! # Context, Architecture, Design
//!
//! See cloud.git Fast Imports RFC (<https://github.com/neondatabase/cloud/pull/19799>)
//! for the full picture.
//! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline
//! is publicly accessible at <https://github.com/neondatabase/neon/pull/9538>.
//!
//! # This is a Prototype!
//!
//! This program is part of a prototype feature and not yet used in production.
//!
//! The cloud.git RFC contains lots of suggestions for improving e2e throughput
//! of this step of the timeline import process.
//!
//! # Local Testing
//!
//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build.
//! - Build the image with the following command:
//!
//! ```bash
//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
use anyhow::Context;
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use nix::unistd::Pid;
use tracing::{info, info_span, warn, Instrument};
use utils::fs_ext::is_directory_empty;
#[path = "fast_import/child_stdio_to_log.rs"]
mod child_stdio_to_log;
#[path = "fast_import/s3_uri.rs"]
mod s3_uri;
#[path = "fast_import/s5cmd.rs"]
mod s5cmd;
#[derive(clap::Parser)]
struct Args {
#[clap(long)]
working_directory: Utf8PathBuf,
#[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
s3_prefix: s3_uri::S3Uri,
#[clap(long)]
pg_bin_dir: Utf8PathBuf,
#[clap(long)]
pg_lib_dir: Utf8PathBuf,
}
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
struct Spec {
encryption_secret: EncryptionSecret,
#[serde_as(as = "serde_with::base64::Base64")]
source_connstring_ciphertext_base64: Vec<u8>,
}
#[derive(serde::Deserialize)]
enum EncryptionSecret {
#[allow(clippy::upper_case_acronyms)]
KMS { key_id: String },
}
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,
)?;
info!("starting");
let Args {
working_directory,
s3_prefix,
pg_bin_dir,
pg_lib_dir,
} = Args::parse();
let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let spec: Spec = {
let spec_key = s3_prefix.append("/spec.json");
let s3_client = aws_sdk_s3::Client::new(&aws_config);
let object = s3_client
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
};
match tokio::fs::create_dir(&working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&working_directory)
.await
.context("check if working directory is empty")?
{
anyhow::bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
let pgdata_dir = working_directory.join("pgdata");
tokio::fs::create_dir(&pgdata_dir)
.await
.context("create pgdata directory")?;
//
// Setup clients
//
let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let kms_client = aws_sdk_kms::Client::new(&aws_config);
//
// Initialize pgdata
//
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser,
locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded,
pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &pgdata_dir,
})
.await
.context("initdb")?;
let nproc = num_cpus::get();
//
// Launch postgres process
//
let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres"))
.arg("-D")
.arg(&pgdata_dir)
.args(["-c", "wal_level=minimal"])
.args(["-c", "shared_buffers=10GB"])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args(["-c", "maintenance_work_mem=8388608"])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
.args(["-c", &format!("max_worker_processes={nproc}")])
.args(["-c", "effective_io_concurrency=100"])
.env_clear()
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn postgres")?;
info!("spawned postgres, waiting for it to become ready");
tokio::spawn(
child_stdio_to_log::relay_process_output(
postgres_proc.stdout.take(),
postgres_proc.stderr.take(),
)
.instrument(info_span!("postgres")),
);
let restore_pg_connstring =
format!("host=localhost port=5432 user={superuser} dbname=postgres");
loop {
let res = tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await;
if res.is_ok() {
info!("postgres is ready, could connect to it");
break;
}
}
//
// Decrypt connection string
//
let source_connection_string = {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let mut output = kms_client
.decrypt()
.key_id(key_id)
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
spec.source_connstring_ciphertext_base64,
))
.send()
.await
.context("decrypt source connection string")?;
let plaintext = output
.plaintext
.take()
.context("get plaintext source connection string")?;
String::from_utf8(plaintext.into_inner())
.context("parse source connection string as utf8")?
}
}
};
//
// Start the work
//
let dumpdir = working_directory.join("dumpdir");
let common_args = [
// schema mapping (prob suffices to specify them on one side)
"--no-owner".to_string(),
"--no-privileges".to_string(),
"--no-publications".to_string(),
"--no-security-labels".to_string(),
"--no-subscriptions".to_string(),
"--no-tablespaces".to_string(),
// format
"--format".to_string(),
"directory".to_string(),
// concurrency
"--jobs".to_string(),
num_cpus::get().to_string(),
// progress updates
"--verbose".to_string(),
];
info!("dump into the working directory");
{
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
.args(&common_args)
.arg("-f")
.arg(&dumpdir)
.arg("--no-sync")
// POSITIONAL args
// source db (db name included in connection string)
.arg(&source_connection_string)
// how we run it
.env_clear()
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn pg_dump")?;
info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
tokio::spawn(
child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
.instrument(info_span!("pg_dump")),
);
let st = pg_dump.wait().await.context("wait for pg_dump")?;
info!(status=?st, "pg_dump exited");
if !st.success() {
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
}
}
// TODO: do it in a streaming way, plenty of internal research done on this already
// TODO: do the unlogged table trick
info!("restore from working directory into vanilla postgres");
{
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
.args(&common_args)
.arg("-d")
.arg(&restore_pg_connstring)
// POSITIONAL args
.arg(&dumpdir)
// how we run it
.env_clear()
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn pg_restore")?;
info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore");
tokio::spawn(
child_stdio_to_log::relay_process_output(
pg_restore.stdout.take(),
pg_restore.stderr.take(),
)
.instrument(info_span!("pg_restore")),
);
let st = pg_restore.wait().await.context("wait for pg_restore")?;
info!(status=?st, "pg_restore exited");
if !st.success() {
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
}
}
info!("shutdown postgres");
{
nix::sys::signal::kill(
Pid::from_raw(
i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
),
nix::sys::signal::SIGTERM,
)
.context("signal postgres to shut down")?;
postgres_proc
.wait()
.await
.context("wait for postgres to shut down")?;
}
info!("upload pgdata");
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
.await
.context("sync dump directory to destination")?;
info!("write status");
{
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("status");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
.await
.context("sync status directory to destination")?;
}
Ok(())
}

View File

@@ -1,35 +0,0 @@
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdout};
use tracing::info;
/// Asynchronously relays the output from a child process's `stdout` and `stderr` to the tracing log.
/// Each line is read and logged individually, with lossy UTF-8 conversion.
///
/// # Arguments
///
/// * `stdout`: An `Option<ChildStdout>` from the child process.
/// * `stderr`: An `Option<ChildStderr>` from the child process.
///
pub(crate) async fn relay_process_output(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) {
let stdout_fut = async {
if let Some(stdout) = stdout {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!(fd = "stdout", "{}", line);
}
}
};
let stderr_fut = async {
if let Some(stderr) = stderr {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!(fd = "stderr", "{}", line);
}
}
};
tokio::join!(stdout_fut, stderr_fut);
}

View File

@@ -1,75 +0,0 @@
use anyhow::Result;
use std::str::FromStr;
/// Struct to hold parsed S3 components
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct S3Uri {
pub bucket: String,
pub key: String,
}
impl FromStr for S3Uri {
type Err = anyhow::Error;
/// Parse an S3 URI into a bucket and key
fn from_str(uri: &str) -> Result<Self> {
// Ensure the URI starts with "s3://"
if !uri.starts_with("s3://") {
return Err(anyhow::anyhow!("Invalid S3 URI scheme"));
}
// Remove the "s3://" prefix
let stripped_uri = &uri[5..];
// Split the remaining string into bucket and key parts
if let Some((bucket, key)) = stripped_uri.split_once('/') {
Ok(S3Uri {
bucket: bucket.to_string(),
key: key.to_string(),
})
} else {
Err(anyhow::anyhow!(
"Invalid S3 URI format, missing bucket or key"
))
}
}
}
impl S3Uri {
pub fn append(&self, suffix: &str) -> Self {
Self {
bucket: self.bucket.clone(),
key: format!("{}{}", self.key, suffix),
}
}
}
impl std::fmt::Display for S3Uri {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "s3://{}/{}", self.bucket, self.key)
}
}
impl clap::builder::TypedValueParser for S3Uri {
type Value = Self;
fn parse_ref(
&self,
_cmd: &clap::Command,
_arg: Option<&clap::Arg>,
value: &std::ffi::OsStr,
) -> Result<Self::Value, clap::Error> {
let value_str = value.to_str().ok_or_else(|| {
clap::Error::raw(
clap::error::ErrorKind::InvalidUtf8,
"Invalid UTF-8 sequence",
)
})?;
S3Uri::from_str(value_str).map_err(|e| {
clap::Error::raw(
clap::error::ErrorKind::InvalidValue,
format!("Failed to parse S3 URI: {}", e),
)
})
}
}

View File

@@ -1,27 +0,0 @@
use anyhow::Context;
use camino::Utf8Path;
use super::s3_uri::S3Uri;
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("s5cmd");
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
builder.arg("--endpoint-url").arg(val);
}
builder
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn s5cmd")?
.wait()
.await
.context("wait for s5cmd")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("s5cmd failed"))
}
}

View File

@@ -116,7 +116,7 @@ pub fn write_postgres_conf(
vartype: "enum".to_owned(),
};
writeln!(file, "{}", opt.to_pg_setting())?;
write!(file, "{}", opt.to_pg_setting())?;
}
}

View File

@@ -20,7 +20,6 @@ use anyhow::Result;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
@@ -73,22 +72,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::GET, "/metrics") => {
debug!("serving /metrics GET request");
// When we call TextEncoder::encode() below, it will immediately
// return an error if a metric family has no metrics, so we need to
// preemptively filter out metric families with no metrics.
let metrics = installed_extensions::collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
let encoder = TextEncoder::new();
let mut buffer = vec![];
if let Err(err) = encoder.encode(&metrics, &mut buffer) {
let msg = format!("error handling /metrics request: {err}");
error!(msg);
return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR);
}
let metrics = installed_extensions::collect();
let encoder = TextEncoder::new();
encoder.encode(&metrics, &mut buffer).unwrap();
match Response::builder()
.status(StatusCode::OK)

View File

@@ -115,7 +115,7 @@ pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version"]
)

View File

@@ -1153,7 +1153,6 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
timeline_info.timeline_id
);
}
// TODO: rename to import-basebackup-plus-wal
TimelineCmd::Import(args) => {
let tenant_id = get_tenant_id(args.tenant_id, env)?;
let timeline_id = args.timeline_id;

View File

@@ -33,6 +33,7 @@ reason = "the marvin attack only affects private key decryption, not public key
[licenses]
allow = [
"Apache-2.0",
"Artistic-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"CC0-1.0",
@@ -66,7 +67,7 @@ registries = []
# More documentation about the 'bans' section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/bans/cfg.html
[bans]
multiple-versions = "allow"
multiple-versions = "warn"
wildcards = "allow"
highlight = "all"
workspace-default-features = "allow"

View File

@@ -33,7 +33,6 @@ remote_storage.workspace = true
postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -97,15 +97,6 @@ pub struct ConfigToml {
pub control_plane_api: Option<reqwest::Url>,
pub control_plane_api_token: Option<String>,
pub control_plane_emergency_mode: bool,
/// Unstable feature: subject to change or removal without notice.
/// See <https://github.com/neondatabase/neon/pull/9218>.
pub import_pgdata_upcall_api: Option<reqwest::Url>,
/// Unstable feature: subject to change or removal without notice.
/// See <https://github.com/neondatabase/neon/pull/9218>.
pub import_pgdata_upcall_api_token: Option<String>,
/// Unstable feature: subject to change or removal without notice.
/// See <https://github.com/neondatabase/neon/pull/9218>.
pub import_pgdata_aws_endpoint_url: Option<reqwest::Url>,
pub heatmap_upload_concurrency: usize,
pub secondary_download_concurrency: usize,
pub virtual_file_io_engine: Option<crate::models::virtual_file::IoEngineKind>,
@@ -395,10 +386,6 @@ impl Default for ConfigToml {
control_plane_api_token: (None),
control_plane_emergency_mode: (false),
import_pgdata_upcall_api: (None),
import_pgdata_upcall_api_token: (None),
import_pgdata_aws_endpoint_url: (None),
heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),

View File

@@ -48,7 +48,7 @@ pub struct ShardedRange<'a> {
// Calculate the size of a range within the blocks of the same relation, or spanning only the
// top page in the previous relation's space.
pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
fn contiguous_range_len(range: &Range<Key>) -> u32 {
debug_assert!(is_contiguous_range(range));
if range.start.field6 == 0xffffffff {
range.end.field6 + 1
@@ -67,7 +67,7 @@ pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
/// This matters, because:
/// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
/// - Within such ranges, we may calculate distances using simple subtraction of field6.
pub fn is_contiguous_range(range: &Range<Key>) -> bool {
fn is_contiguous_range(range: &Range<Key>) -> bool {
range.start.field1 == range.end.field1
&& range.start.field2 == range.end.field2
&& range.start.field3 == range.end.field3

View File

@@ -2,8 +2,6 @@ pub mod detach_ancestor;
pub mod partitioning;
pub mod utilization;
#[cfg(feature = "testing")]
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;
use std::{
@@ -229,9 +227,6 @@ pub enum TimelineCreateRequestMode {
// we continue to accept it by having it here.
pg_version: Option<u32>,
},
ImportPgdata {
import_pgdata: TimelineCreateRequestModeImportPgdata,
},
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
// (serde picks the first matching enum variant, in declaration order).
Bootstrap {
@@ -241,42 +236,6 @@ pub enum TimelineCreateRequestMode {
},
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequestModeImportPgdata {
pub location: ImportPgdataLocation,
pub idempotency_key: ImportPgdataIdempotencyKey,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ImportPgdataLocation {
#[cfg(feature = "testing")]
LocalFs { path: Utf8PathBuf },
AwsS3 {
region: String,
bucket: String,
/// A better name for this would be `prefix`; changing requires coordination with cplane.
/// See <https://github.com/neondatabase/cloud/issues/20646>.
key: String,
},
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(transparent)]
pub struct ImportPgdataIdempotencyKey(pub String);
impl ImportPgdataIdempotencyKey {
pub fn random() -> Self {
use rand::{distributions::Alphanumeric, Rng};
Self(
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)
.collect(),
)
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct LsnLeaseRequest {
pub lsn: Lsn,

View File

@@ -1,12 +0,0 @@
[package]
name = "postgres_initdb"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
tokio.workspace = true
camino.workspace = true
thiserror.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,103 +0,0 @@
//! The canonical way we run `initdb` in Neon.
//!
//! initdb has implicit defaults that are dependent on the environment, e.g., locales & collations.
//!
//! This module's job is to eliminate the environment-dependence as much as possible.
use std::fmt;
use camino::Utf8Path;
pub struct RunInitdbArgs<'a> {
pub superuser: &'a str,
pub locale: &'a str,
pub initdb_bin: &'a Utf8Path,
pub pg_version: u32,
pub library_search_path: &'a Utf8Path,
pub pgdata: &'a Utf8Path,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
Spawn(std::io::Error),
Failed {
status: std::process::ExitStatus,
stderr: Vec<u8>,
},
WaitOutput(std::io::Error),
Other(anyhow::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e),
Error::Failed { status, stderr } => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e),
Error::Other(e) => write!(f, "Error: {:?}", e),
}
}
}
pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> {
let RunInitdbArgs {
superuser,
locale,
initdb_bin: initdb_bin_path,
pg_version,
library_search_path,
pgdata,
} = args;
let mut initdb_command = tokio::process::Command::new(initdb_bin_path);
initdb_command
.args(["--pgdata", pgdata.as_ref()])
.args(["--username", superuser])
.args(["--encoding", "utf8"])
.args(["--locale", locale])
.arg("--no-instructions")
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", library_search_path)
.env("DYLD_LIBRARY_PATH", library_search_path)
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())
// we would be interested in the stderr output, if there was any
.stderr(std::process::Stdio::piped());
// Before version 14, only the libc provide was available.
if pg_version > 14 {
// Version 17 brought with it a builtin locale provider which only provides
// C and C.UTF-8. While being safer for collation purposes since it is
// guaranteed to be consistent throughout a major release, it is also more
// performant.
let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" };
initdb_command.args(["--locale-provider", locale_provider]);
}
let initdb_proc = initdb_command.spawn().map_err(Error::Spawn)?;
// Ideally we'd select here with the cancellation token, but the problem is that
// we can't safely terminate initdb: it launches processes of its own, and killing
// initdb doesn't kill them. After we return from this function, we want the target
// directory to be able to be cleaned up.
// See https://github.com/neondatabase/neon/issues/6385
let initdb_output = initdb_proc
.wait_with_output()
.await
.map_err(Error::WaitOutput)?;
if !initdb_output.status.success() {
return Err(Error::Failed {
status: initdb_output.status,
stderr: initdb_output.stderr,
});
}
Ok(())
}

View File

@@ -18,7 +18,6 @@ camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
hyper = { workspace = true, features = ["client"] }
futures.workspace = true
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }

View File

@@ -15,11 +15,8 @@ use std::time::SystemTime;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Result;
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::HttpClient;
use azure_core::TransportOptions;
use azure_core::{Continuable, RetryOptions};
use azure_identity::DefaultAzureCredential;
use azure_storage::CloudLocation;
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::prelude::ClientBuilder;
@@ -27,7 +24,6 @@ use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerCl
use bytes::Bytes;
use futures::future::Either;
use futures::stream::Stream;
use futures::FutureExt;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http_types::{StatusCode, Url};
@@ -35,7 +31,6 @@ use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use utils::backoff;
use utils::backoff::exponential_backoff_duration_seconds;
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::{
@@ -75,17 +70,8 @@ impl AzureBlobStorage {
StorageCredentials::token_credential(Arc::new(token_credential))
};
let location = match &azure_config.endpoint {
None => CloudLocation::Public { account },
Some(endpoint) => CloudLocation::Custom {
account,
uri: endpoint.clone(),
},
};
let builder = ClientBuilder::with_location(location, credentials)
// we have an outer retry
.retry(RetryOptions::none())
.transport(TransportOptions::new(reqwest_client(true)));
// we have an outer retry
let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
let client = builder.container_client(azure_config.container_name.to_owned());
@@ -258,15 +244,6 @@ impl AzureBlobStorage {
}
}
fn reqwest_client(allow_idle_connections: bool) -> Arc<dyn HttpClient> {
let max_idle = if allow_idle_connections { 8 } else { 0 };
let client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(max_idle)
.build()
.expect("failed to build `reqwest` client");
Arc::new(client)
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
@@ -325,59 +302,40 @@ impl RemoteStorage for AzureBlobStorage {
let mut next_marker = None;
let mut timeout_try_cnt = 1;
'outer: loop {
let mut builder = builder.clone();
if let Some(marker) = next_marker.clone() {
builder = builder.marker(marker);
}
// Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
// their pageable iterator wrapper that returns all keys as a stream. We want to have
// full control of paging, and therefore we only take the first item from the stream.
let mut response_stream = builder.into_stream();
let response = response_stream.next();
// Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
// would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
// (Usually, exponential backoff is used to determine the sleep time between two retries.) We
// start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
// timeout = min(5 * (1.0+1.0)^n, self.timeout).
let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
let response = response.map(|res| {
match res {
Ok(Some(Ok(res))) => Ok(Some(res)),
Ok(Some(Err(e))) => Err(to_download_error(e)),
Ok(None) => Ok(None),
Err(_elasped) => Err(DownloadError::Timeout),
}
let response = builder.into_stream();
let response = response.into_stream().map_err(to_download_error);
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
let response = response.map(|res| match res {
Ok(res) => res,
Err(_elapsed) => Err(DownloadError::Timeout),
});
let mut response = std::pin::pin!(response);
let mut max_keys = max_keys.map(|mk| mk.get());
let next_item = tokio::select! {
op = response => op,
op = response.next() => Ok(op),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
};
if let Err(DownloadError::Timeout) = &next_item {
timeout_try_cnt += 1;
if timeout_try_cnt <= 5 {
continue;
}
}
let next_item = next_item?;
if timeout_try_cnt >= 2 {
tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
}
timeout_try_cnt = 1;
}?;
let Some(entry) = next_item else {
// The list is complete, so yield it.
break;
};
let mut res = Listing::default();
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
// The error is potentially retryable, so we must rewind the loop after yielding.
yield Err(e);
continue;
}
};
next_marker = entry.continuation();
let prefix_iter = entry
.blobs
@@ -393,7 +351,7 @@ impl RemoteStorage for AzureBlobStorage {
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
}
);
);
for key in blob_iter {
res.keys.push(key);

View File

@@ -125,8 +125,6 @@ pub struct AzureConfig {
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
pub prefix_in_container: Option<String>,
/// The endpoint to use. Use the default if None.
pub endpoint: Option<String>,
/// Azure has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
@@ -146,7 +144,6 @@ impl Debug for AzureConfig {
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("endpoint", &self.endpoint)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
@@ -299,7 +296,6 @@ timeout = '5s'";
storage_account: None,
container_region: "westeurope".into(),
prefix_in_container: None,
endpoint: None,
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),

View File

@@ -216,7 +216,6 @@ async fn create_azure_client(
storage_account: None,
container_region: remote_storage_azure_region,
prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
}),

View File

@@ -29,7 +29,6 @@ jsonwebtoken.workspace = true
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
pprof.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true

View File

@@ -1,8 +1,7 @@
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use crate::http::request::{get_query_param, parse_query_param};
use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
@@ -13,13 +12,11 @@ use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tracing::{debug, info, info_span, warn, Instrument};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use pprof::protos::Message as _;
use tokio::sync::{mpsc, Mutex};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
@@ -331,82 +328,6 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
Ok(response)
}
/// Generates CPU profiles.
pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
enum Format {
Pprof,
Svg,
}
// Parameters.
let format = match get_query_param(&req, "format")?.as_deref() {
None => Format::Pprof,
Some("pprof") => Format::Pprof,
Some("svg") => Format::Svg,
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
};
let seconds = match parse_query_param(&req, "seconds")? {
None => 5,
Some(seconds @ 1..=30) => seconds,
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
};
let frequency_hz = match parse_query_param(&req, "frequency")? {
None => 99,
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
Some(frequency) => frequency,
};
// Only allow one profiler at a time.
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
let _lock = PROFILE_LOCK
.try_lock()
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
// Take the profile.
let report = tokio::task::spawn_blocking(move || {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(frequency_hz)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?;
std::thread::sleep(Duration::from_secs(seconds));
guard.report().build()
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
// Return the report in the requested format.
match format {
Format::Pprof => {
let mut body = Vec::new();
report
.pprof()
.map_err(|err| ApiError::InternalServerError(err.into()))?
.write_to_vec(&mut body)
.map_err(|err| ApiError::InternalServerError(err.into()))?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_DISPOSITION, "attachment; filename=\"profile.pb\"")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Svg => {
let mut body = Vec::new();
report
.flamegraph(&mut body)
.map_err(|err| ApiError::InternalServerError(err.into()))?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {

View File

@@ -30,7 +30,7 @@ pub fn parse_request_param<T: FromStr>(
}
}
pub fn get_query_param<'a>(
fn get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Option<Cow<'a, str>>, ApiError> {

View File

@@ -83,9 +83,7 @@ where
}
wake_these.push(self.heap.pop().unwrap().wake_channel);
}
if !wake_these.is_empty() {
self.update_status();
}
self.update_status();
wake_these
}

View File

@@ -43,7 +43,6 @@ postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
postgres_initdb.workspace = true
rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
@@ -69,7 +68,6 @@ url.workspace = true
walkdir.workspace = true
metrics.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
pageserver_compaction.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true

View File

@@ -144,10 +144,6 @@ pub struct PageServerConf {
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
pub import_pgdata_upcall_api: Option<Url>,
pub import_pgdata_upcall_api_token: Option<SecretString>,
pub import_pgdata_aws_endpoint_url: Option<Url>,
/// If true, pageserver will make best-effort to operate without a control plane: only
/// for use in major incidents.
pub control_plane_emergency_mode: bool,
@@ -332,9 +328,6 @@ impl PageServerConf {
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
import_pgdata_upcall_api,
import_pgdata_upcall_api_token,
import_pgdata_aws_endpoint_url,
heatmap_upload_concurrency,
secondary_download_concurrency,
ingest_batch_size,
@@ -390,9 +383,6 @@ impl PageServerConf {
timeline_offloading,
ephemeral_bytes_per_memory_kb,
server_side_batch_timeout,
import_pgdata_upcall_api,
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
import_pgdata_aws_endpoint_url,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -3,6 +3,7 @@ mod list_writer;
mod validator;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
@@ -157,6 +158,7 @@ pub struct DeletionQueueClient {
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
max_layer_generation_in_queue: Arc<AtomicU32>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -382,6 +384,17 @@ pub enum DeletionQueueError {
}
impl DeletionQueueClient {
/// Returns if there is any layer file <= `gen` in the deletion queue.
pub(crate) fn maybe_processing_generation(&self, gen: Generation) -> bool {
if let Some(gen) = gen.into() {
self.max_layer_generation_in_queue
.load(std::sync::atomic::Ordering::SeqCst)
>= gen
} else {
false
}
}
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
@@ -505,6 +518,13 @@ impl DeletionQueueClient {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
for (_, meta) in &layers {
let Some(gen) = meta.generation.into() else {
continue;
};
self.max_layer_generation_in_queue
.fetch_max(gen, std::sync::atomic::Ordering::SeqCst);
}
self.do_push(
&self.tx,
ListWriterQueueMessage::Delete(DeletionOp {
@@ -651,6 +671,7 @@ impl DeletionQueue {
tx,
executor_tx: executor_tx.clone(),
lsn_table: lsn_table.clone(),
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
},
cancel: cancel.clone(),
},
@@ -1265,6 +1286,7 @@ pub(crate) mod mock {
tx: self.tx.clone(),
executor_tx: self.executor_tx.clone(),
lsn_table: self.lsn_table.clone(),
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
}
}
}

View File

@@ -304,6 +304,20 @@ where
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
// Well, this check might be redundant, but it's a good sanity check. This fast
// path was added because `test_emergency_mode` wrt the initial barrier. The initial
// barrier will force and wait the deletion queue to be completely flushed, but if
// the control plane fails, we cannot really flush anything, and the whole test will
// get stuck. This check will prevent the test from getting stuck, and in reality, if
// there is nothing to delete, the initial barrier will not stuck.
if self.pending_lists.is_empty()
&& self.validated_lists.is_empty()
&& self.pending_key_count == 0
{
// Fast path: nothing to do
return Ok(());
}
// Issue any required generation validation calls to the control plane
self.validate().await?;

View File

@@ -623,8 +623,6 @@ paths:
existing_initdb_timeline_id:
type: string
format: hex
import_pgdata:
$ref: "#/components/schemas/TimelineCreateRequestImportPgdata"
responses:
"201":
description: Timeline was created, or already existed with matching parameters
@@ -981,34 +979,6 @@ components:
$ref: "#/components/schemas/TenantConfig"
effective_config:
$ref: "#/components/schemas/TenantConfig"
TimelineCreateRequestImportPgdata:
type: object
required:
- location
- idempotency_key
properties:
idempotency_key:
type: string
location:
$ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocation"
TimelineCreateRequestImportPgdataLocation:
type: object
properties:
AwsS3:
$ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocationAwsS3"
TimelineCreateRequestImportPgdataLocationAwsS3:
type: object
properties:
region:
type: string
bucket:
type: string
key:
type: string
required:
- region
- bucket
- key
TimelineInfo:
type: object
required:

View File

@@ -40,7 +40,6 @@ use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelineCreateRequestMode;
use pageserver_api::models::TimelineCreateRequestModeImportPgdata;
use pageserver_api::models::TimelinesInfoAndOffloaded;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
@@ -56,7 +55,6 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::profile_cpu_handler;
use utils::http::endpoint::prometheus_metrics_handler;
use utils::http::endpoint::request_span;
use utils::http::request::must_parse_query_param;
@@ -82,7 +80,6 @@ use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::import_pgdata;
use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
@@ -128,7 +125,7 @@ pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: &'static [&'static str],
allowlist_routes: Vec<Uri>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
@@ -149,13 +146,10 @@ impl State {
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
) -> anyhow::Result<Self> {
let allowlist_routes = &[
"/v1/status",
"/v1/doc",
"/swagger.yml",
"/metrics",
"/profile/cpu",
];
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
@@ -582,35 +576,6 @@ async fn timeline_create_handler(
ancestor_timeline_id,
ancestor_start_lsn,
}),
TimelineCreateRequestMode::ImportPgdata {
import_pgdata:
TimelineCreateRequestModeImportPgdata {
location,
idempotency_key,
},
} => tenant::CreateTimelineParams::ImportPgdata(tenant::CreateTimelineParamsImportPgdata {
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new(
idempotency_key.0,
),
new_timeline_id,
location: {
use import_pgdata::index_part_format::Location;
use pageserver_api::models::ImportPgdataLocation;
match location {
#[cfg(feature = "testing")]
ImportPgdataLocation::LocalFs { path } => Location::LocalFs { path },
ImportPgdataLocation::AwsS3 {
region,
bucket,
key,
} => Location::AwsS3 {
region,
bucket,
key,
},
}
},
}),
};
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
@@ -3183,7 +3148,7 @@ pub fn make_router(
if auth.is_some() {
router = router.middleware(auth_middleware(|request| {
let state = get_state(request);
if state.allowlist_routes.contains(&request.uri().path()) {
if state.allowlist_routes.contains(request.uri()) {
None
} else {
state.auth.as_deref()
@@ -3202,7 +3167,6 @@ pub fn make_router(
Ok(router
.data(state)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)

View File

@@ -1068,26 +1068,21 @@ impl PageServerHandler {
));
}
// Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
if request_lsn == Lsn::INVALID {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
// Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
//
// We may have older data available, but we make a best effort to detect this case and return an error,
// to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
if request_lsn < **latest_gc_cutoff_lsn {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.leases.contains_key(&request_lsn) {
return Err(
// The requested LSN is below gc cutoff and is not guarded by a lease.
// Check explicitly for INVALID just to get a less scary error message if the
// request is obviously bogus
return Err(if request_lsn == Lsn::INVALID {
PageStreamError::BadRequest("invalid LSN(0) in request".into())
} else {
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
);
});
}
}

View File

@@ -2276,9 +2276,9 @@ impl<'a> Version<'a> {
//--- Metadata structs stored in key-value pairs in the repository.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DbDirectory {
struct DbDirectory {
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
dbdirs: HashMap<(Oid, Oid), bool>,
}
// The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
@@ -2287,8 +2287,8 @@ pub(crate) struct DbDirectory {
// "pg_twophsae/0000000A000002E4".
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct TwoPhaseDirectory {
pub(crate) xids: HashSet<TransactionId>,
struct TwoPhaseDirectory {
xids: HashSet<TransactionId>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -2297,12 +2297,12 @@ struct TwoPhaseDirectoryV17 {
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct RelDirectory {
struct RelDirectory {
// Set of relations that exist. (relfilenode, forknum)
//
// TODO: Store it as a btree or radix tree or something else that spans multiple
// key-value pairs, if you have a lot of relations
pub(crate) rels: HashSet<(Oid, u8)>,
rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -2311,9 +2311,9 @@ struct RelSizeEntry {
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct SlruSegmentDirectory {
struct SlruSegmentDirectory {
// Set of SLRU segments that exist.
pub(crate) segments: HashSet<u32>,
segments: HashSet<u32>,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]

View File

@@ -381,8 +381,6 @@ pub enum TaskKind {
UnitTest,
DetachAncestor,
ImportPgdata,
}
#[derive(Default)]

View File

@@ -43,9 +43,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
use tokio::task::JoinSet;
@@ -375,6 +373,7 @@ pub struct Tenant {
l0_flush_global_state: L0FlushGlobalState,
}
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
@@ -861,7 +860,6 @@ impl Debug for SetStoppingError {
pub(crate) enum CreateTimelineParams {
Bootstrap(CreateTimelineParamsBootstrap),
Branch(CreateTimelineParamsBranch),
ImportPgdata(CreateTimelineParamsImportPgdata),
}
#[derive(Debug)]
@@ -879,14 +877,7 @@ pub(crate) struct CreateTimelineParamsBranch {
pub(crate) ancestor_start_lsn: Option<Lsn>,
}
#[derive(Debug)]
pub(crate) struct CreateTimelineParamsImportPgdata {
pub(crate) new_timeline_id: TimelineId,
pub(crate) location: import_pgdata::index_part_format::Location,
pub(crate) idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`] in [`Tenant::start_creating_timeline`].
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`].
///
/// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`].
///
@@ -916,50 +907,19 @@ pub(crate) enum CreateTimelineIdempotency {
ancestor_timeline_id: TimelineId,
ancestor_start_lsn: Lsn,
},
ImportPgdata(CreatingTimelineIdempotencyImportPgdata),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct CreatingTimelineIdempotencyImportPgdata {
idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is returned by [`Tenant::start_creating_timeline`].
#[must_use]
enum StartCreatingTimelineResult {
CreateGuard(TimelineCreateGuard),
enum StartCreatingTimelineResult<'t> {
CreateGuard(TimelineCreateGuard<'t>),
Idempotent(Arc<Timeline>),
}
enum TimelineInitAndSyncResult {
ReadyToActivate(Arc<Timeline>),
NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata),
}
impl TimelineInitAndSyncResult {
fn ready_to_activate(self) -> Option<Arc<Timeline>> {
match self {
Self::ReadyToActivate(timeline) => Some(timeline),
_ => None,
}
}
}
#[must_use]
struct TimelineInitAndSyncNeedsSpawnImportPgdata {
timeline: Arc<Timeline>,
import_pgdata: import_pgdata::index_part_format::Root,
guard: TimelineCreateGuard,
}
/// What is returned by [`Tenant::create_timeline`].
enum CreateTimelineResult {
Created(Arc<Timeline>),
Idempotent(Arc<Timeline>),
/// IMPORTANT: This [`Arc<Timeline>`] object is not in [`Tenant::timelines`] when
/// we return this result, nor will this concrete object ever be added there.
/// Cf method comment on [`Tenant::create_timeline_import_pgdata`].
ImportSpawned(Arc<Timeline>),
}
impl CreateTimelineResult {
@@ -967,19 +927,18 @@ impl CreateTimelineResult {
match self {
Self::Created(_) => "Created",
Self::Idempotent(_) => "Idempotent",
Self::ImportSpawned(_) => "ImportSpawned",
}
}
fn timeline(&self) -> &Arc<Timeline> {
match self {
Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t,
Self::Created(t) | Self::Idempotent(t) => t,
}
}
/// Unit test timelines aren't activated, test has to do it if it needs to.
#[cfg(test)]
fn into_timeline_for_test(self) -> Arc<Timeline> {
match self {
Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t,
Self::Created(t) | Self::Idempotent(t) => t,
}
}
}
@@ -1003,13 +962,33 @@ pub enum CreateTimelineError {
}
#[derive(thiserror::Error, Debug)]
pub enum InitdbError {
#[error("Operation was cancelled")]
Cancelled,
#[error(transparent)]
enum InitdbError {
Other(anyhow::Error),
#[error(transparent)]
Inner(postgres_initdb::Error),
Cancelled,
Spawn(std::io::Result<()>),
Failed(std::process::ExitStatus, Vec<u8>),
}
impl fmt::Display for InitdbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
InitdbError::Cancelled => write!(f, "Operation was cancelled"),
InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
InitdbError::Failed(status, stderr) => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
InitdbError::Other(e) => write!(f, "Error: {:?}", e),
}
}
}
impl From<std::io::Error> for InitdbError {
fn from(error: std::io::Error) -> Self {
InitdbError::Spawn(Err(error))
}
}
enum CreateTimelineCause {
@@ -1017,15 +996,6 @@ enum CreateTimelineCause {
Delete,
}
enum LoadTimelineCause {
Attach,
Unoffload,
ImportPgdata {
create_guard: TimelineCreateGuard,
activate: ActivateTimelineArgs,
},
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GcError {
// The tenant is shutting down
@@ -1102,35 +1072,24 @@ impl Tenant {
/// it is marked as Active.
#[allow(clippy::too_many_arguments)]
async fn timeline_init_and_sync(
self: &Arc<Self>,
&self,
timeline_id: TimelineId,
resources: TimelineResources,
mut index_part: IndexPart,
index_part: IndexPart,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
cause: LoadTimelineCause,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInitAndSyncResult> {
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
let import_pgdata = index_part.import_pgdata.take();
let idempotency = match &import_pgdata {
Some(import_pgdata) => {
CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata {
idempotency_key: import_pgdata.idempotency_key().clone(),
})
let idempotency = if metadata.ancestor_timeline().is_none() {
CreateTimelineIdempotency::Bootstrap {
pg_version: metadata.pg_version(),
}
None => {
if metadata.ancestor_timeline().is_none() {
CreateTimelineIdempotency::Bootstrap {
pg_version: metadata.pg_version(),
}
} else {
CreateTimelineIdempotency::Branch {
ancestor_timeline_id: metadata.ancestor_timeline().unwrap(),
ancestor_start_lsn: metadata.ancestor_lsn(),
}
}
} else {
CreateTimelineIdempotency::Branch {
ancestor_timeline_id: metadata.ancestor_timeline().unwrap(),
ancestor_start_lsn: metadata.ancestor_lsn(),
}
};
@@ -1162,91 +1121,39 @@ impl Tenant {
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
})?;
match import_pgdata {
Some(import_pgdata) if !import_pgdata.is_done() => {
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata { .. } => {
unreachable!("ImportPgdata should not be reloading timeline import is done and persisted as such in s3")
}
{
// avoiding holding it across awaits
let mut timelines_accessor = self.timelines.lock().unwrap();
match timelines_accessor.entry(timeline_id) {
// We should never try and load the same timeline twice during startup
Entry::Occupied(_) => {
unreachable!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
);
}
let mut guard = self.timelines_creating.lock().unwrap();
if !guard.insert(timeline_id) {
// We should never try and load the same timeline twice during startup
unreachable!("Timeline {tenant_id}/{timeline_id} is already being created")
Entry::Vacant(v) => {
v.insert(Arc::clone(&timeline));
timeline.maybe_spawn_flush_loop();
}
let timeline_create_guard = TimelineCreateGuard {
_tenant_gate_guard: self.gate.enter()?,
owning_tenant: self.clone(),
timeline_id,
idempotency,
// The users of this specific return value don't need the timline_path in there.
timeline_path: timeline
.conf
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id),
};
Ok(TimelineInitAndSyncResult::NeedsSpawnImportPgdata(
TimelineInitAndSyncNeedsSpawnImportPgdata {
timeline,
import_pgdata,
guard: timeline_create_guard,
},
))
}
Some(_) | None => {
{
let mut timelines_accessor = self.timelines.lock().unwrap();
match timelines_accessor.entry(timeline_id) {
// We should never try and load the same timeline twice during startup
Entry::Occupied(_) => {
unreachable!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
);
}
Entry::Vacant(v) => {
v.insert(Arc::clone(&timeline));
timeline.maybe_spawn_flush_loop();
}
}
}
};
// Sanity check: a timeline should have some content.
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
// Sanity check: a timeline should have some content.
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata {
create_guard,
activate,
} => {
// TODO: see the comment in the task code above how I'm not so certain
// it is safe to activate here because of concurrent shutdowns.
match activate {
ActivateTimelineArgs::Yes { broker_client } => {
info!("activating timeline after reload from pgdata import task");
timeline.activate(self.clone(), broker_client, None, ctx);
}
ActivateTimelineArgs::No => (),
}
drop(create_guard);
}
}
Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline))
}
}
Ok(())
}
/// Attach a tenant that's available in cloud storage.
@@ -1671,46 +1578,24 @@ impl Tenant {
}
// TODO again handle early failure
let effect = self
.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
LoadTimelineCause::Attach,
ctx,
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_shard_id
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_shard_id
)
})?;
match effect {
TimelineInitAndSyncResult::ReadyToActivate(_) => {
// activation happens later, on Tenant::activate
}
TimelineInitAndSyncResult::NeedsSpawnImportPgdata(
TimelineInitAndSyncNeedsSpawnImportPgdata {
timeline,
import_pgdata,
guard,
},
) => {
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline,
import_pgdata,
ActivateTimelineArgs::No,
guard,
));
}
}
})?;
}
// Walk through deleted timelines, resume deletion
@@ -1834,14 +1719,13 @@ impl Tenant {
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
async fn load_remote_timeline(
self: &Arc<Self>,
&self,
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
resources: TimelineResources,
cause: LoadTimelineCause,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInitAndSyncResult> {
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
info!("downloading index file for timeline {}", timeline_id);
@@ -1868,7 +1752,6 @@ impl Tenant {
index_part,
remote_metadata,
ancestor,
cause,
ctx,
)
.await
@@ -2055,7 +1938,6 @@ impl Tenant {
TimelineArchivalError::Other(anyhow::anyhow!("Timeline already exists"))
}
TimelineExclusionError::Other(e) => TimelineArchivalError::Other(e),
TimelineExclusionError::ShuttingDown => TimelineArchivalError::Cancelled,
})?;
let timeline_preload = self
@@ -2094,7 +1976,6 @@ impl Tenant {
index_part,
remote_metadata,
timeline_resources,
LoadTimelineCause::Unoffload,
&ctx,
)
.await
@@ -2332,7 +2213,7 @@ impl Tenant {
///
/// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys.
pub(crate) async fn create_empty_timeline(
self: &Arc<Self>,
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
@@ -2382,7 +2263,7 @@ impl Tenant {
// Our current tests don't need the background loops.
#[cfg(test)]
pub async fn create_test_timeline(
self: &Arc<Self>,
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
@@ -2421,7 +2302,7 @@ impl Tenant {
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub async fn create_test_timeline_with_layers(
self: &Arc<Self>,
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
@@ -2558,16 +2439,6 @@ impl Tenant {
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
.await?
}
CreateTimelineParams::ImportPgdata(params) => {
self.create_timeline_import_pgdata(
params,
ActivateTimelineArgs::Yes {
broker_client: broker_client.clone(),
},
ctx,
)
.await?
}
};
// At this point we have dropped our guard on [`Self::timelines_creating`], and
@@ -2610,202 +2481,11 @@ impl Tenant {
);
timeline
}
CreateTimelineResult::ImportSpawned(timeline) => {
info!("import task spawned, timeline will become visible and activated once the import is done");
timeline
}
};
Ok(activated_timeline)
}
/// The returned [`Arc<Timeline>`] is NOT in the [`Tenant::timelines`] map until the import
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
/// [`Tenant::timelines`] map when the import completes.
/// We only return an [`Arc<Timeline>`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`]
/// for the response.
async fn create_timeline_import_pgdata(
self: &Arc<Tenant>,
params: CreateTimelineParamsImportPgdata,
activate: ActivateTimelineArgs,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
let CreateTimelineParamsImportPgdata {
new_timeline_id,
location,
idempotency_key,
} = params;
let started_at = chrono::Utc::now().naive_utc();
//
// There's probably a simpler way to upload an index part, but, remote_timeline_client
// is the canonical way we do it.
// - create an empty timeline in-memory
// - use its remote_timeline_client to do the upload
// - dispose of the uninit timeline
// - keep the creation guard alive
let timeline_create_guard = match self
.start_creating_timeline(
new_timeline_id,
CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata {
idempotency_key: idempotency_key.clone(),
}),
)
.await?
{
StartCreatingTimelineResult::CreateGuard(guard) => guard,
StartCreatingTimelineResult::Idempotent(timeline) => {
return Ok(CreateTimelineResult::Idempotent(timeline))
}
};
let mut uninit_timeline = {
let this = &self;
let initdb_lsn = Lsn(0);
let _ctx = ctx;
async move {
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
Lsn(0),
None,
None,
Lsn(0),
initdb_lsn,
initdb_lsn,
15,
);
this.prepare_new_timeline(
new_timeline_id,
&new_metadata,
timeline_create_guard,
initdb_lsn,
None,
)
.await
}
}
.await?;
let in_progress = import_pgdata::index_part_format::InProgress {
idempotency_key,
location,
started_at,
};
let index_part = import_pgdata::index_part_format::Root::V1(
import_pgdata::index_part_format::V1::InProgress(in_progress),
);
uninit_timeline
.raw_timeline()
.unwrap()
.remote_client
.schedule_index_upload_for_import_pgdata_state_update(Some(index_part.clone()))?;
// wait_completion happens in caller
let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself();
tokio::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
index_part,
activate,
timeline_create_guard,
));
// NB: the timeline doesn't exist in self.timelines at this point
Ok(CreateTimelineResult::ImportSpawned(timeline))
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))]
async fn create_timeline_import_pgdata_task(
self: Arc<Tenant>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
info!("starting");
scopeguard::defer! {info!("exiting")};
let res = self
.create_timeline_import_pgdata_task_impl(
timeline,
index_part,
activate,
timeline_create_guard,
)
.await;
if let Err(err) = &res {
error!(?err, "task failed");
// TODO sleep & retry, sensitive to tenant shutdown
// TODO: allow timeline deletion requests => should cancel the task
}
}
async fn create_timeline_import_pgdata_task_impl(
self: Arc<Tenant>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
) -> Result<(), anyhow::Error> {
let ctx = RequestContext::new(TaskKind::ImportPgdata, DownloadBehavior::Warn);
info!("importing pgdata");
import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone())
.await
.context("import")?;
info!("import done");
//
// Reload timeline from remote.
// This proves that the remote state is attachable, and it reuses the code.
//
// TODO: think about whether this is safe to do with concurrent Tenant::shutdown.
// timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit.
// But our activate() call might launch new background tasks after Tenant::shutdown
// already went past shutting down the Tenant::timelines, which this timeline here is no part of.
// I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting
// down while bootstrapping/branching + activating), but, the race condition is much more likely
// to manifest because of the long runtime of this import task.
// in theory this shouldn't even .await anything except for coop yield
info!("shutting down timeline");
timeline.shutdown(ShutdownMode::Hard).await;
info!("timeline shut down, reloading from remote");
// TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc<Timeline>
// let Some(timeline) = Arc::into_inner(timeline) else {
// anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere");
// };
let timeline_id = timeline.timeline_id;
// load from object storage like Tenant::attach does
let resources = self.build_timeline_resources(timeline_id);
let index_part = resources
.remote_client
.download_index_file(&self.cancel)
.await?;
let index_part = match index_part {
MaybeDeletedIndexPart::Deleted(_) => {
// likely concurrent delete call, cplane should prevent this
anyhow::bail!("index part says deleted but we are not done creating yet, this should not happen but")
}
MaybeDeletedIndexPart::IndexPart(p) => p,
};
let metadata = index_part.metadata.clone();
self
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
create_guard: timeline_create_guard, activate, }, &ctx)
.await?
.ready_to_activate()
.context("implementation error: reloaded timeline still needs import after import reported success")?;
anyhow::Ok(())
}
pub(crate) async fn delete_timeline(
self: Arc<Self>,
timeline_id: TimelineId,
@@ -3657,13 +3337,6 @@ where
Ok(result)
}
enum ActivateTimelineArgs {
Yes {
broker_client: storage_broker::BrokerClientChannel,
},
No,
}
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.load().tenant_conf.clone()
@@ -3847,7 +3520,6 @@ impl Tenant {
/// `validate_ancestor == false` is used when a timeline is created for deletion
/// and we might not have the ancestor present anymore which is fine for to be
/// deleted timelines.
#[allow(clippy::too_many_arguments)]
fn create_timeline_struct(
&self,
new_timeline_id: TimelineId,
@@ -4611,17 +4283,16 @@ impl Tenant {
/// If the timeline was already created in the meantime, we check whether this
/// request conflicts or is idempotent , based on `state`.
async fn start_creating_timeline(
self: &Arc<Self>,
&self,
new_timeline_id: TimelineId,
idempotency: CreateTimelineIdempotency,
) -> Result<StartCreatingTimelineResult, CreateTimelineError> {
) -> Result<StartCreatingTimelineResult<'_>, CreateTimelineError> {
let allow_offloaded = false;
match self.create_timeline_create_guard(new_timeline_id, idempotency, allow_offloaded) {
Ok(create_guard) => {
pausable_failpoint!("timeline-creation-after-uninit");
Ok(StartCreatingTimelineResult::CreateGuard(create_guard))
}
Err(TimelineExclusionError::ShuttingDown) => Err(CreateTimelineError::ShuttingDown),
Err(TimelineExclusionError::AlreadyCreating) => {
// Creation is in progress, we cannot create it again, and we cannot
// check if this request matches the existing one, so caller must try
@@ -4911,7 +4582,7 @@ impl Tenant {
&'a self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
create_guard: TimelineCreateGuard,
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<UninitializedTimeline<'a>> {
@@ -4971,7 +4642,7 @@ impl Tenant {
/// The `allow_offloaded` parameter controls whether to tolerate the existence of
/// offloaded timelines or not.
fn create_timeline_create_guard(
self: &Arc<Self>,
&self,
timeline_id: TimelineId,
idempotency: CreateTimelineIdempotency,
allow_offloaded: bool,
@@ -5231,16 +4902,48 @@ async fn run_initdb(
let _permit = INIT_DB_SEMAPHORE.acquire().await;
let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser: &conf.superuser,
locale: &conf.locale,
initdb_bin: &initdb_bin_path,
pg_version,
library_search_path: &initdb_lib_dir,
pgdata: initdb_target_dir,
})
.await
.map_err(InitdbError::Inner);
let mut initdb_command = tokio::process::Command::new(&initdb_bin_path);
initdb_command
.args(["--pgdata", initdb_target_dir.as_ref()])
.args(["--username", &conf.superuser])
.args(["--encoding", "utf8"])
.args(["--locale", &conf.locale])
.arg("--no-instructions")
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", &initdb_lib_dir)
.env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())
// we would be interested in the stderr output, if there was any
.stderr(std::process::Stdio::piped());
// Before version 14, only the libc provide was available.
if pg_version > 14 {
// Version 17 brought with it a builtin locale provider which only provides
// C and C.UTF-8. While being safer for collation purposes since it is
// guaranteed to be consistent throughout a major release, it is also more
// performant.
let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" };
initdb_command.args(["--locale-provider", locale_provider]);
}
let initdb_proc = initdb_command.spawn()?;
// Ideally we'd select here with the cancellation token, but the problem is that
// we can't safely terminate initdb: it launches processes of its own, and killing
// initdb doesn't kill them. After we return from this function, we want the target
// directory to be able to be cleaned up.
// See https://github.com/neondatabase/neon/issues/6385
let initdb_output = initdb_proc.wait_with_output().await?;
if !initdb_output.status.success() {
return Err(InitdbError::Failed(
initdb_output.status,
initdb_output.stderr,
));
}
// This isn't true cancellation support, see above. Still return an error to
// excercise the cancellation code path.
@@ -5248,7 +4951,7 @@ async fn run_initdb(
return Err(InitdbError::Cancelled);
}
res
Ok(())
}
/// Dump contents of a layer file to stdout.

View File

@@ -199,7 +199,7 @@ use utils::backoff::{
use utils::pausable_failpoint;
use utils::shard::ShardNumber;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Duration;
@@ -223,7 +223,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
@@ -244,8 +244,7 @@ use self::index::IndexPart;
use super::config::AttachedLocationConfig;
use super::metadata::MetadataUpdate;
use super::storage_layer::{Layer, LayerName, ResidentLayer};
use super::timeline::import_pgdata;
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
use super::upload_queue::{BarrierType, NotInitialized, SetDeletedFlagProgress};
use super::{DeleteTimelineError, Generation};
pub(crate) use download::{
@@ -814,18 +813,6 @@ impl RemoteTimelineClient {
Ok(need_wait)
}
/// Launch an index-file upload operation in the background, setting `import_pgdata` field.
pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
self: &Arc<Self>,
state: Option<import_pgdata::index_part_format::Root>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.import_pgdata = state;
self.schedule_index_upload(upload_queue)?;
Ok(())
}
///
/// Launch an index-file upload operation in the background, if necessary.
///
@@ -909,7 +896,7 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
Some(self.schedule_wait_barrier0(upload_queue))
}
};
@@ -948,7 +935,7 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
Some(self.schedule_wait_barrier0(upload_queue))
}
};
@@ -986,7 +973,9 @@ impl RemoteTimelineClient {
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
(x, y) if wanted(x) && !wanted(y) => {
Some(self.schedule_wait_barrier0(upload_queue))
}
// Usual case: !wanted(x) && !wanted(y)
//
// Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
@@ -1003,7 +992,7 @@ impl RemoteTimelineClient {
.map(|x| x.with_reason(reason))
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
Some(self.schedule_wait_barrier0(upload_queue))
}
}
};
@@ -1046,7 +1035,9 @@ impl RemoteTimelineClient {
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
(x, y) if wanted(x) && !wanted(y) => {
Some(self.schedule_wait_barrier0(upload_queue))
}
(x, y) => {
if !wanted(x) && wanted(y) {
warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
@@ -1057,7 +1048,7 @@ impl RemoteTimelineClient {
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
// FIXME: bogus ?
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
Some(self.schedule_wait_barrier0(upload_queue))
}
}
};
@@ -1103,7 +1094,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata, None);
let op = UploadOp::UploadLayer(layer, metadata);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -1313,7 +1304,7 @@ impl RemoteTimelineClient {
let upload_queue = guard
.initialized_mut()
.map_err(WaitCompletionError::NotInitialized)?;
self.schedule_barrier0(upload_queue)
self.schedule_wait_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
@@ -1329,19 +1320,32 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
pub fn schedule_initial_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
self.schedule_barrier(BarrierType::Initial)
}
fn schedule_barrier(self: &Arc<Self>, barrier: BarrierType) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue);
self.schedule_barrier0(upload_queue, barrier);
Ok(())
}
/// Schedule a barrier to wait for all previously scheduled operations to complete.
fn schedule_wait_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
self.schedule_barrier0(upload_queue, BarrierType::Normal)
}
fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
barrier: BarrierType,
) -> tokio::sync::watch::Receiver<()> {
let (sender, receiver) = tokio::sync::watch::channel(());
let barrier_op = UploadOp::Barrier(sender);
let barrier_op = UploadOp::Barrier(sender, barrier);
upload_queue.queued_operations.push_back(barrier_op);
// Don't count this kind of operation!
@@ -1809,21 +1813,24 @@ impl RemoteTimelineClient {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
UploadOp::UploadLayer(_, meta) => {
// Can always be scheduled except when there's a barrier, or if the deletion queue doesn't contain any file with the same/lower generation.
upload_queue.num_inprogress_barriers == 0
|| !self
.deletion_queue_client
.maybe_processing_generation(meta.generation)
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
UploadOp::Delete(_) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
UploadOp::Barrier(_) | UploadOp::Shutdown => {
UploadOp::Barrier(_, _) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};
@@ -1846,38 +1853,30 @@ impl RemoteTimelineClient {
}
// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();
let next_op = upload_queue.queued_operations.pop_front().unwrap();
debug!("starting op: {}", next_op);
// Update the counters and prepare
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
{
*mode = Some(OpType::FlushDeletion);
} else {
*mode = Some(OpType::MayReorder)
}
// Update the counters
match next_op {
UploadOp::UploadLayer(_, _) => {
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
UploadOp::Barrier(sender, BarrierType::Normal) => {
// For other barriers, simply send back the ack.
sender.send_replace(());
continue;
}
UploadOp::Barrier(_, BarrierType::Initial) => {
// For initial barrier, we need to wait for deletions.
upload_queue.num_inprogress_barriers += 1;
}
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
};
@@ -1947,66 +1946,7 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
let mut queue_locked = self.upload_queue.lock().unwrap();
let mut detected = false;
if let Ok(queue) = queue_locked.initialized_mut() {
for list in queue.blocked_deletions.iter_mut() {
list.layers.retain(|(name, meta)| {
if name == &layer.layer_desc().layer_name()
&& meta.generation == layer_metadata.generation
{
detected = true;
// remove the layer from deletion queue
false
} else {
// keep the layer
true
}
});
}
}
if detected {
info!(
"cancelled blocked deletion of layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
} else {
// TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
// that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
// which is not possible in the current system.
info!(
"waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
{
// We are going to flush, we can clean up the recently deleted list.
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.recently_deleted.clear();
}
}
if let Err(e) = self.deletion_queue_client.flush_execute().await {
warn!(
"failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
} else {
info!(
"done flushing deletion queue before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
}
}
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let local_path = layer.local_path();
// We should only be uploading layers created by this `Tenant`'s lifetime, so
@@ -2078,7 +2018,6 @@ impl RemoteTimelineClient {
}
Ok(())
} else {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
@@ -2090,7 +2029,12 @@ impl RemoteTimelineClient {
.map_err(|e| anyhow::anyhow!(e))
}
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
UploadOp::Barrier(_, _) => self
.deletion_queue_client
.flush_execute()
.await
.map_err(|e| anyhow::anyhow!(e)),
unexpected @ UploadOp::Shutdown => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
warn!("unexpected {unexpected:?} operation in perform_upload_task");
@@ -2100,6 +2044,10 @@ impl RemoteTimelineClient {
match upload_result {
Ok(()) => {
if let UploadOp::Barrier(sender, _) = &task.op {
// Notify the caller that the barrier has been reached.
sender.send(()).ok();
}
break;
}
Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
@@ -2170,7 +2118,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);
let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _) => {
UploadOp::UploadLayer(_, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
@@ -2211,7 +2159,11 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
UploadOp::Barrier(..) => {
upload_queue.num_inprogress_barriers -= 1;
None
}
UploadOp::Shutdown => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
@@ -2247,7 +2199,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m, _) => (
UploadOp::UploadLayer(_, m) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
@@ -2337,6 +2289,7 @@ impl RemoteTimelineClient {
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
num_inprogress_barriers: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
@@ -2344,7 +2297,6 @@ impl RemoteTimelineClient {
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
recently_deleted: HashSet::new(),
};
let upload_queue = std::mem::replace(
@@ -2367,7 +2319,8 @@ impl RemoteTimelineClient {
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
+ qi.num_inprogress_deletions
+ qi.num_inprogress_barriers,
qi.inprogress_tasks.len()
);

View File

@@ -706,7 +706,7 @@ where
.and_then(|x| x)
}
pub(crate) async fn download_retry_forever<T, O, F>(
async fn download_retry_forever<T, O, F>(
op: O,
description: &str,
cancel: &CancellationToken,

View File

@@ -12,7 +12,6 @@ use utils::id::TimelineId;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::import_pgdata;
use crate::tenant::Generation;
use pageserver_api::shard::ShardIndex;
@@ -38,13 +37,6 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none")]
pub archived_at: Option<NaiveDateTime>,
/// This field supports import-from-pgdata ("fast imports" platform feature).
/// We don't currently use fast imports, so, this field is None for all production timelines.
/// See <https://github.com/neondatabase/neon/pull/9218> for more information.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub import_pgdata: Option<import_pgdata::index_part_format::Root>,
/// Per layer file name metadata, which can be present for a present or missing layer file.
///
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
@@ -98,11 +90,10 @@ impl IndexPart {
/// - 7: metadata_bytes is no longer written, but still read
/// - 8: added `archived_at`
/// - 9: +gc_blocking
/// - 10: +import_pgdata
const LATEST_VERSION: usize = 10;
const LATEST_VERSION: usize = 9;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -117,7 +108,6 @@ impl IndexPart {
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
}
}
@@ -391,7 +381,6 @@ mod tests {
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -436,7 +425,6 @@ mod tests {
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -482,7 +470,6 @@ mod tests {
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -531,7 +518,6 @@ mod tests {
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -575,7 +561,6 @@ mod tests {
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -622,7 +607,6 @@ mod tests {
},
gc_blocking: None,
last_aux_file_policy: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -674,7 +658,6 @@ mod tests {
},
gc_blocking: None,
last_aux_file_policy: Some(AuxFilePolicy::V2),
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -731,7 +714,6 @@ mod tests {
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: Default::default(),
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -789,7 +771,6 @@ mod tests {
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: Default::default(),
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -852,83 +833,6 @@ mod tests {
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v10_importpgdata_is_parsed() {
let example = r#"{
"version": 10,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
}
}"#;
let expected = IndexPart {
version: 10,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
})))
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -4,7 +4,6 @@ pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
pub(crate) mod handle;
pub(crate) mod import_pgdata;
mod init;
pub mod layer_manager;
pub(crate) mod logical_size;
@@ -2086,11 +2085,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn is_gc_blocked_by_lsn_lease_deadline(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf.is_gc_blocked_by_lsn_lease_deadline()
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2652,9 +2646,10 @@ impl Timeline {
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
// TODO: this is basically a no-op now, should we remove it?
self.remote_client.schedule_barrier()?;
// (1) and (4) ONLY IF generation number gets bumped. There are some cases where
// we load a tenant without bumping the generation number (i.e., detach ancestor
// and timeline offload/un-offload). In those cases, we need to rely on the barrier.
self.remote_client.schedule_initial_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
@@ -2709,23 +2704,20 @@ impl Timeline {
{
Some(cancel) => cancel.cancel(),
None => {
match self.current_state() {
TimelineState::Broken { .. } | TimelineState::Stopping => {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
// Don't make noise.
}
TimelineState::Loading => {
// Import does not return an activated timeline.
info!("discarding priority boost for logical size calculation because timeline is not yet active");
}
TimelineState::Active => {
// activation should be setting the once cell
warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
debug_assert!(false);
}
let state = self.current_state();
if matches!(
state,
TimelineState::Broken { .. } | TimelineState::Stopping
) {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
// Don't make noise.
} else {
warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
debug_assert!(false);
}
}
}
};
}
}

View File

@@ -1,218 +0,0 @@
use std::sync::Arc;
use anyhow::{bail, Context};
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::{info, info_span, Instrument};
use utils::lsn::Lsn;
use crate::{context::RequestContext, tenant::metadata::TimelineMetadata};
use super::Timeline;
mod flow;
mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) mod upcall_api;
pub async fn doit(
timeline: &Arc<Timeline>,
index_part: index_part_format::Root,
ctx: &RequestContext,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let index_part_format::Root::V1(v1) = index_part;
let index_part_format::InProgress {
location,
idempotency_key,
started_at,
} = match v1 {
index_part_format::V1::Done(_) => return Ok(()),
index_part_format::V1::InProgress(in_progress) => in_progress,
};
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
info!("get spec early so we know we'll be able to upcall when done");
let Some(spec) = storage.get_spec().await? else {
bail!("spec not found")
};
let upcall_client =
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
//
// send an early progress update to clean up k8s job early and generate potentially useful logs
//
info!("send early progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("early_progress_update"))
.await?;
let status_prefix = RemotePath::from_string("status").unwrap();
//
// See if shard is done.
// TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
//
let shard_status_key =
status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
let shard_status: Option<importbucket_format::ShardStatus> =
storage.get_json(&shard_status_key).await?;
info!(?shard_status, "peeking shard status");
if shard_status.map(|st| st.done).unwrap_or(false) {
info!("shard status indicates that the shard is done, skipping import");
} else {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefintely waiting for pgdata to finish");
if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
.await
.is_ok()
{
bail!("cancelled while waiting for pgdata");
}
}
}
}
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
let _start_lsn = disk_consistent_lsn + 1;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline.remote_client.wait_completion().await?;
anyhow::Ok(())
}
}
.await?;
flow::run(
timeline.clone(),
base_lsn,
control_file,
storage.clone(),
ctx,
)
.await?;
//
// Communicate that shard is done.
//
storage
.put_json(
&shard_status_key,
&importbucket_format::ShardStatus { done: true },
)
.await
.context("put shard status")?;
}
//
// Ensure at-least-once deliver of the upcall to cplane
// before we mark the task as done and never come here again.
//
info!("send final progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("final_progress_update"))
.await?;
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path
// instead of spawning the import task and calling this here function.
//
info!("mark import as complete in index part");
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
index_part_format::V1::Done(index_part_format::Done {
idempotency_key,
started_at,
finished_at: chrono::Utc::now().naive_utc(),
}),
)))?;
timeline.remote_client.wait_completion().await?;
Ok(())
}

View File

@@ -1,798 +0,0 @@
//! Import a PGDATA directory into an empty root timeline.
//!
//! This module is adapted hackathon code by Heikki and Stas.
//! Other code in the parent module was written by Christian as part of a customer PoC.
//!
//! The hackathon code was producing image layer files as a free-standing program.
//!
//! It has been modified to
//! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
//! - => sharding-awareness: produce image layers with only the data relevant for this shard
//! - => S3 as the source for the PGDATA instead of local filesystem
//!
//! TODOs before productionization:
//! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
//! => produced image layers likely too small.
//! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
//! - asserts / unwraps need to be replaced with errors
//! - don't trust remote objects will be small (=prevent OOMs in those cases)
//! - limit all in-memory buffers in size, or download to disk and read from there
//! - limit task concurrency
//! - generally play nice with other tenants in the system
//! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
//! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
//! - integrate with layer eviction system
//! - audit for Tenant::cancel nor Timeline::cancel responsivity
//! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
//!
//! An incomplete set of TODOs from the Hackathon:
//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
use std::sync::Arc;
use anyhow::{bail, ensure};
use bytes::Bytes;
use itertools::Itertools;
use pageserver_api::{
key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY},
reltag::RelTag,
shard::ShardIdentity,
};
use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ};
use tokio::task::JoinSet;
use tracing::{debug, info_span, instrument, Instrument};
use crate::{
assert_u64_eq_usize::UsizeIsU64,
pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory},
};
use crate::{
context::{DownloadBehavior, RequestContext},
pgdatadir_mapping::{DbDirectory, RelDirectory},
task_mgr::TaskKind,
tenant::storage_layer::{ImageLayerWriter, Layer},
};
use pageserver_api::key::Key;
use pageserver_api::key::{
slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY,
TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::singleton_range;
use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range};
use pageserver_api::reltag::SlruKind;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use std::collections::HashSet;
use std::ops::Range;
use super::{
importbucket_client::{ControlFile, RemoteStorageWrapper},
Timeline,
};
use remote_storage::RemotePath;
pub async fn run(
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
ctx: &RequestContext,
) -> anyhow::Result<()> {
Flow {
timeline,
pgdata_lsn,
control_file,
tasks: Vec::new(),
storage,
}
.run(ctx)
.await
}
struct Flow {
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
tasks: Vec<AnyImportTask>,
storage: RemoteStorageWrapper,
}
impl Flow {
/// Perform the ingestion into [`Self::timeline`].
/// Assumes the timeline is empty (= no layers).
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(&self.storage).await?;
// Import dbdir (00:00:00 keyspace)
// This is just constructed here, but will be written to the image layer in the first call to import_db()
let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
dbdirs: datadir
.dbs
.iter()
.map(|db| ((db.spcnode, db.dboid), true))
.collect(),
})?);
self.tasks
.push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
// Import databases (00:spcnode:dbnode keyspace for each db)
for db in datadir.dbs {
self.import_db(&db).await?;
}
// Import SLRUs
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(
SlruKind::MultiXactMembers,
&self.storage.pgdata().join("pg_multixact/members"),
)
.await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(
SlruKind::MultiXactOffsets,
&self.storage.pgdata().join("pg_multixact/offsets"),
)
.await?;
// Import pg_twophase.
// TODO: as empty
let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})?;
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
TWOPHASEDIR_KEY,
Bytes::from(twophasedir_buf),
)));
// Controlfile, checkpoint
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
CONTROLFILE_KEY,
self.control_file.control_file_buf().clone(),
)));
let checkpoint_buf = self
.control_file
.control_file_data()
.checkPointCopy
.encode()?;
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
CHECKPOINT_KEY,
checkpoint_buf,
)));
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
let key_range = last_end_key..task.key_range().start;
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
&self,
));
last_end_key = key_range.end;
current_chunk_size = 0;
}
current_chunk_size += task.total_size();
current_chunk.push(task);
}
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::MAX,
current_chunk,
&self,
));
// Start all jobs simultaneosly
let mut work = JoinSet::new();
// TODO: semaphore?
for job in parallel_jobs {
let ctx: RequestContext =
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
}
let mut results = Vec::new();
while let Some(result) = work.join_next().await {
match result {
Ok(res) => {
results.push(res);
}
Err(_joinset_err) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
debug!("start");
scopeguard::defer! {
debug!("return");
}
// Import relmap (00:spcnode:dbnode:00:*:00)
let relmap_key = relmap_file_key(db.spcnode, db.dboid);
debug!("Constructing relmap entry, key {relmap_key}");
let relmap_path = db.path.join("pg_filenode.map");
let relmap_buf = self.storage.get(&relmap_path).await?;
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
relmap_key, relmap_buf,
)));
// Import reldir (00:spcnode:dbnode:00:*:01)
let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
debug!("Constructing reldirs entry, key {reldir_key}");
let reldir_buf = RelDirectory::ser(&RelDirectory {
rels: db
.files
.iter()
.map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
.collect(),
})?;
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
reldir_key,
Bytes::from(reldir_buf),
)));
// Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
// segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
for file in &db.files {
debug!(%file.path, %file.filesize, "importing file");
let len = file.filesize;
ensure!(len % 8192 == 0);
let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
let start_key = rel_block_to_key(file.rel_tag, start_blk);
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
*self.timeline.get_shard_identity(),
start_key..end_key,
&file.path,
self.storage.clone(),
)));
// Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
if let Some(nblocks) = file.nblocks {
let size_key = rel_size_to_key(file.rel_tag);
//debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
let buf = nblocks.to_le_bytes();
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
size_key,
Bytes::from(buf.to_vec()),
)));
}
}
Ok(())
}
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
.into_iter()
.filter_map(|(path, size)| {
let filename = path.object_name()?;
let segno = u32::from_str_radix(filename, 16).ok()?;
Some((filename.to_string(), segno, size))
})
.collect();
// Write SlruDir
let slrudir_key = slru_dir_to_key(kind);
let segnos: HashSet<u32> = segments
.iter()
.map(|(_path, segno, _size)| *segno)
.collect();
let slrudir = SlruSegmentDirectory { segments: segnos };
let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
slrudir_key,
Bytes::from(slrudir_buf),
)));
for (segpath, segno, size) in segments {
// SlruSegBlocks for each segment
let p = path.join(&segpath);
let file_size = size;
ensure!(file_size % 8192 == 0);
let nblocks = u32::try_from(file_size / 8192)?;
let start_key = slru_block_to_key(kind, segno, 0);
let end_key = slru_block_to_key(kind, segno, nblocks);
debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
self.tasks
.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
*self.timeline.get_shard_identity(),
start_key..end_key,
&p,
self.storage.clone(),
)));
// Followed by SlruSegSize
let segsize_key = slru_segment_size_to_key(kind, segno);
let segsize_buf = nblocks.to_le_bytes();
self.tasks
.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
segsize_key,
Bytes::copy_from_slice(&segsize_buf),
)));
}
Ok(())
}
}
//
// dbdir iteration tools
//
struct PgDataDir {
pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
}
struct PgDataDirDb {
pub spcnode: u32,
pub dboid: u32,
pub path: RemotePath,
pub files: Vec<PgDataDirDbFile>,
}
struct PgDataDirDbFile {
pub path: RemotePath,
pub rel_tag: RelTag,
pub segno: u32,
pub filesize: usize,
// Cummulative size of the given fork, set only for the last segment of that fork
pub nblocks: Option<usize>,
}
impl PgDataDir {
async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
let datadir_path = storage.pgdata();
// Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
// Traverse database in increasing oid order
let basedir = &datadir_path.join("base");
let db_oids: Vec<_> = storage
.listdir(basedir)
.await?
.into_iter()
.filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
.sorted()
.collect();
debug!(?db_oids, "found databases");
let mut databases = Vec::new();
for dboid in db_oids {
databases.push(
PgDataDirDb::new(
storage,
&basedir.join(dboid.to_string()),
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
&datadir_path,
)
.await?,
);
}
// special case for global catalogs
databases.push(
PgDataDirDb::new(
storage,
&datadir_path.join("global"),
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
0,
&datadir_path,
)
.await?,
);
databases.sort_by_key(|db| (db.spcnode, db.dboid));
Ok(Self { dbs: databases })
}
}
impl PgDataDirDb {
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
async fn new(
storage: &RemoteStorageWrapper,
db_path: &RemotePath,
spcnode: u32,
dboid: u32,
datadir_path: &RemotePath,
) -> anyhow::Result<Self> {
let mut files: Vec<PgDataDirDbFile> = storage
.listfilesindir(db_path)
.await?
.into_iter()
.filter_map(|(path, size)| {
debug!(%path, %size, "found file in dbdir");
path.object_name().and_then(|name| {
// returns (relnode, forknum, segno)
parse_relfilename(name).ok().map(|x| (size, x))
})
})
.sorted_by_key(|(_, relfilename)| *relfilename)
.map(|(filesize, (relnode, forknum, segno))| {
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
let path = datadir_path.join(rel_tag.to_segfile_name(segno));
assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
let nblocks = filesize / BLCKSZ as usize;
PgDataDirDbFile {
path,
filesize,
rel_tag,
segno,
nblocks: Some(nblocks), // first non-cummulative sizes
}
})
.collect();
// Set cummulative sizes. Do all of that math here, so that later we could easier
// parallelize over segments and know with which segments we need to write relsize
// entry.
let mut cumulative_nblocks: usize = 0;
let mut prev_rel_tag: Option<RelTag> = None;
for i in 0..files.len() {
if prev_rel_tag == Some(files[i].rel_tag) {
cumulative_nblocks += files[i].nblocks.unwrap();
} else {
cumulative_nblocks = files[i].nblocks.unwrap();
}
files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
Some(cumulative_nblocks)
} else {
None
};
prev_rel_tag = Some(files[i].rel_tag);
}
Ok(PgDataDirDb {
files,
path: db_path.clone(),
spcnode,
dboid,
})
}
}
trait ImportTask {
fn key_range(&self) -> Range<Key>;
fn total_size(&self) -> usize {
// TODO: revisit this
if is_contiguous_range(&self.key_range()) {
contiguous_range_len(&self.key_range()) as usize * 8192
} else {
u32::MAX as usize
}
}
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize>;
}
struct ImportSingleKeyTask {
key: Key,
buf: Bytes,
}
impl ImportSingleKeyTask {
fn new(key: Key, buf: Bytes) -> Self {
ImportSingleKeyTask { key, buf }
}
}
impl ImportTask for ImportSingleKeyTask {
fn key_range(&self) -> Range<Key> {
singleton_range(self.key)
}
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
layer_writer.put_image(self.key, self.buf, ctx).await?;
Ok(1)
}
}
struct ImportRelBlocksTask {
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: RemotePath,
storage: RemoteStorageWrapper,
}
impl ImportRelBlocksTask {
fn new(
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: &RemotePath,
storage: RemoteStorageWrapper,
) -> Self {
ImportRelBlocksTask {
shard_identity,
key_range,
path: path.clone(),
storage,
}
}
}
impl ImportTask for ImportRelBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
debug!("Importing relation file");
let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
assert_eq!(rel_tag, rel_tag_end);
let ranges = (start_blk..end_blk)
.enumerate()
.filter_map(|(i, blknum)| {
let key = rel_block_to_key(rel_tag, blknum);
if self.shard_identity.is_key_disposable(&key) {
return None;
}
let file_offset = i.checked_mul(8192).unwrap();
Some((
vec![key],
file_offset,
file_offset.checked_add(8192).unwrap(),
))
})
.coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
assert_eq!(key.len(), 1);
assert!(!acc.is_empty());
assert!(acc_end > acc_start);
if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
acc.push(key.pop().unwrap());
Ok((acc, acc_start, end))
} else {
Err(((acc, acc_start, acc_end), (key, start, end)))
}
});
let mut nimages = 0;
for (keys, range_start, range_end) in ranges {
let range_buf = self
.storage
.get_range(&self.path, range_start.into_u64(), range_end.into_u64())
.await?;
let mut buf = Bytes::from(range_buf);
// TODO: batched writes
for key in keys {
let image = buf.split_to(8192);
layer_writer.put_image(key, image, ctx).await?;
nimages += 1;
}
}
Ok(nimages)
}
}
struct ImportSlruBlocksTask {
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: RemotePath,
storage: RemoteStorageWrapper,
}
impl ImportSlruBlocksTask {
fn new(
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: &RemotePath,
storage: RemoteStorageWrapper,
) -> Self {
ImportSlruBlocksTask {
shard_identity,
key_range,
path: path.clone(),
storage,
}
}
}
impl ImportTask for ImportSlruBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
debug!("Importing SLRU segment file {}", self.path);
let buf = self.storage.get(&self.path).await?;
let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
let mut blknum = start_blk;
let mut nimages = 0;
let mut file_offset = 0;
while blknum < end_blk {
let key = slru_block_to_key(kind, segno, blknum);
assert!(
!self.shard_identity.is_key_disposable(&key),
"SLRU keys need to go into every shard"
);
let buf = &buf[file_offset..(file_offset + 8192)];
file_offset += 8192;
layer_writer
.put_image(key, Bytes::copy_from_slice(buf), ctx)
.await?;
blknum += 1;
nimages += 1;
}
Ok(nimages)
}
}
enum AnyImportTask {
SingleKey(ImportSingleKeyTask),
RelBlocks(ImportRelBlocksTask),
SlruBlocks(ImportSlruBlocksTask),
}
impl ImportTask for AnyImportTask {
fn key_range(&self) -> Range<Key> {
match self {
Self::SingleKey(t) => t.key_range(),
Self::RelBlocks(t) => t.key_range(),
Self::SlruBlocks(t) => t.key_range(),
}
}
/// returns the number of images put into the `layer_writer`
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
match self {
Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
}
}
}
impl From<ImportSingleKeyTask> for AnyImportTask {
fn from(t: ImportSingleKeyTask) -> Self {
Self::SingleKey(t)
}
}
impl From<ImportRelBlocksTask> for AnyImportTask {
fn from(t: ImportRelBlocksTask) -> Self {
Self::RelBlocks(t)
}
}
impl From<ImportSlruBlocksTask> for AnyImportTask {
fn from(t: ImportSlruBlocksTask) -> Self {
Self::SlruBlocks(t)
}
}
struct ChunkProcessingJob {
timeline: Arc<Timeline>,
range: Range<Key>,
tasks: Vec<AnyImportTask>,
pgdata_lsn: Lsn,
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
timeline: env.timeline.clone(),
range,
tasks,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = ImageLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
ctx,
)
.await?;
let mut nimages = 0;
for task in self.tasks {
nimages += task.doit(&mut writer, ctx).await?;
}
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
} else {
// dropping the writer cleans up
return Ok(());
};
// this is sharing the same code as create_image_layers
let mut guard = self.timeline.layers.write().await;
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
crate::tenant::timeline::drop_wlock(guard);
// Schedule the layer for upload but don't add barriers such as
// wait for completion or index upload, so we don't inhibit upload parallelism.
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
self.timeline
.remote_client
.schedule_layer_file_upload(resident_layer)?;
Ok(())
}
}

View File

@@ -1,315 +0,0 @@
use std::{ops::Bound, sync::Arc};
use anyhow::Context;
use bytes::Bytes;
use postgres_ffi::ControlFileData;
use remote_storage::{
Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath,
};
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
use utils::lsn::Lsn;
use crate::{assert_u64_eq_usize::U64IsUsize, config::PageServerConf};
use super::{importbucket_format, index_part_format};
pub async fn new(
conf: &'static PageServerConf,
location: &index_part_format::Location,
cancel: CancellationToken,
) -> Result<RemoteStorageWrapper, anyhow::Error> {
// FIXME: we probably want some timeout, and we might be able to assume the max file
// size on S3 is 1GiB (postgres segment size). But the problem is that the individual
// downloaders don't know enough about concurrent downloads to make a guess on the
// expected bandwidth and resulting best timeout.
let timeout = std::time::Duration::from_secs(24 * 60 * 60);
let location_storage = match location {
#[cfg(feature = "testing")]
index_part_format::Location::LocalFs { path } => {
GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?)
}
index_part_format::Location::AwsS3 {
region,
bucket,
key,
} => {
// TODO: think about security implications of letting the client specify the bucket & prefix.
// It's the most flexible right now, but, possibly we want to move bucket name into PS conf
// and force the timeline_id into the prefix?
GenericRemoteStorage::AwsS3(Arc::new(
remote_storage::S3Bucket::new(
&remote_storage::S3Config {
bucket_name: bucket.clone(),
prefix_in_bucket: Some(key.clone()),
bucket_region: region.clone(),
endpoint: conf
.import_pgdata_aws_endpoint_url
.clone()
.map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env
concurrency_limit: 100.try_into().unwrap(), // TODO: think about this
max_keys_per_list_response: Some(1000), // TODO: think about this
upload_storage_class: None, // irrelevant
},
timeout,
)
.await
.context("setup s3 bucket")?,
))
}
};
let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel);
Ok(storage_wrapper)
}
/// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API
/// such as [`tokio::fs`], which was used in the original implementation of the import code.
#[derive(Clone)]
pub struct RemoteStorageWrapper {
storage: GenericRemoteStorage,
cancel: CancellationToken,
}
impl RemoteStorageWrapper {
pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self {
Self { storage, cancel }
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn listfilesindir(
&self,
path: &RemotePath,
) -> Result<Vec<(RemotePath, usize)>, DownloadError> {
assert!(
path.object_name().is_some(),
"must specify dirname, without trailing slash"
);
let path = path.add_trailing_slash();
let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
|| async {
let Listing { keys, prefixes: _ } = self
.storage
.list(
Some(&path),
remote_storage::ListingMode::WithDelimiter,
None,
&self.cancel,
)
.await?;
let res = keys
.into_iter()
.map(|ListingObject { key, size, .. }| (key, size.into_usize()))
.collect();
Ok(res)
},
&format!("listfilesindir {path:?}"),
&self.cancel,
)
.await;
debug!(?res, "returning");
res
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn listdir(&self, path: &RemotePath) -> Result<Vec<RemotePath>, DownloadError> {
assert!(
path.object_name().is_some(),
"must specify dirname, without trailing slash"
);
let path = path.add_trailing_slash();
let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
|| async {
let Listing { keys, prefixes } = self
.storage
.list(
Some(&path),
remote_storage::ListingMode::WithDelimiter,
None,
&self.cancel,
)
.await?;
let res = keys
.into_iter()
.map(|ListingObject { key, .. }| key)
.chain(prefixes.into_iter())
.collect();
Ok(res)
},
&format!("listdir {path:?}"),
&self.cancel,
)
.await;
debug!(?res, "returning");
res
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get(&self, path: &RemotePath) -> Result<Bytes, DownloadError> {
let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
|| async {
let Download {
download_stream, ..
} = self
.storage
.download(path, &DownloadOpts::default(), &self.cancel)
.await?;
let mut reader = tokio_util::io::StreamReader::new(download_stream);
// XXX optimize this, can we get the capacity hint from somewhere?
let mut buf = Vec::new();
tokio::io::copy_buf(&mut reader, &mut buf).await?;
Ok(Bytes::from(buf))
},
&format!("download {path:?}"),
&self.cancel,
)
.await;
debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
res
}
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
self.get_json(&RemotePath::from_string("spec.json").unwrap())
.await
.context("get spec")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_json<T: DeserializeOwned>(
&self,
path: &RemotePath,
) -> Result<Option<T>, DownloadError> {
let buf = match self.get(path).await {
Ok(buf) => buf,
Err(DownloadError::NotFound) => return Ok(None),
Err(err) => return Err(err),
};
let res = serde_json::from_slice(&buf)
.context("serialize")
// TODO: own error type
.map_err(DownloadError::Other)?;
Ok(Some(res))
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn put_json<T>(&self, path: &RemotePath, value: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let buf = serde_json::to_vec(value)?;
let bytes = Bytes::from(buf);
utils::backoff::retry(
|| async {
let size = bytes.len();
let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
self.storage
.upload_storage_object(bytes, size, path, &self.cancel)
.await
},
remote_storage::TimeoutOrCancel::caused_by_cancel,
1,
u32::MAX,
&format!("put json {path}"),
&self.cancel,
)
.await
.expect("practically infinite retries")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_range(
&self,
path: &RemotePath,
start_inclusive: u64,
end_exclusive: u64,
) -> Result<Vec<u8>, DownloadError> {
let len = end_exclusive
.checked_sub(start_inclusive)
.unwrap()
.into_usize();
let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
|| async {
let Download {
download_stream, ..
} = self
.storage
.download(
path,
&DownloadOpts {
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive)
},
&self.cancel)
.await?;
let mut reader = tokio_util::io::StreamReader::new(download_stream);
let mut buf = Vec::with_capacity(len);
tokio::io::copy_buf(&mut reader, &mut buf).await?;
Ok(buf)
},
&format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"),
&self.cancel,
)
.await;
debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
res
}
pub fn pgdata(&self) -> RemotePath {
RemotePath::from_string("pgdata").unwrap()
}
pub async fn get_control_file(&self) -> Result<ControlFile, anyhow::Error> {
let control_file_path = self.pgdata().join("global/pg_control");
info!("get control file from {control_file_path}");
let control_file_buf = self.get(&control_file_path).await?;
ControlFile::new(control_file_buf)
}
}
pub struct ControlFile {
control_file_data: ControlFileData,
control_file_buf: Bytes,
}
impl ControlFile {
pub(crate) fn new(control_file_buf: Bytes) -> Result<Self, anyhow::Error> {
// XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes.
let control_file_data = ControlFileData::decode(&control_file_buf)?;
let control_file = ControlFile {
control_file_data,
control_file_buf,
};
control_file.try_pg_version()?; // so that we can offer infallible pg_version()
Ok(control_file)
}
pub(crate) fn base_lsn(&self) -> Lsn {
Lsn(self.control_file_data.checkPoint).align()
}
pub(crate) fn pg_version(&self) -> u32 {
self.try_pg_version()
.expect("prepare() checks that try_pg_version doesn't error")
}
pub(crate) fn control_file_data(&self) -> &ControlFileData {
&self.control_file_data
}
pub(crate) fn control_file_buf(&self) -> &Bytes {
&self.control_file_buf
}
fn try_pg_version(&self) -> anyhow::Result<u32> {
Ok(match self.control_file_data.catalog_version_no {
// thesea are from catversion.h
202107181 => 14,
202209061 => 15,
202307071 => 16,
/* XXX pg17 */
catversion => {
anyhow::bail!("unrecognized catalog version {catversion}")
}
})
}
}

View File

@@ -1,20 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct PgdataStatus {
pub done: bool,
// TODO: remaining fields
}
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}
// TODO: dedupe with fast_import code
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Spec {
pub project_id: String,
pub branch_id: String,
}

View File

@@ -1,68 +0,0 @@
use serde::{Deserialize, Serialize};
#[cfg(feature = "testing")]
use camino::Utf8PathBuf;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum Root {
V1(V1),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum V1 {
InProgress(InProgress),
Done(Done),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(transparent)]
pub struct IdempotencyKey(String);
impl IdempotencyKey {
pub fn new(s: String) -> Self {
Self(s)
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct InProgress {
pub idempotency_key: IdempotencyKey,
pub location: Location,
pub started_at: chrono::NaiveDateTime,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct Done {
pub idempotency_key: IdempotencyKey,
pub started_at: chrono::NaiveDateTime,
pub finished_at: chrono::NaiveDateTime,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum Location {
#[cfg(feature = "testing")]
LocalFs { path: Utf8PathBuf },
AwsS3 {
region: String,
bucket: String,
key: String,
},
}
impl Root {
pub fn is_done(&self) -> bool {
match self {
Root::V1(v1) => match v1 {
V1::Done(_) => true,
V1::InProgress(_) => false,
},
}
}
pub fn idempotency_key(&self) -> &IdempotencyKey {
match self {
Root::V1(v1) => match v1 {
V1::InProgress(in_progress) => &in_progress.idempotency_key,
V1::Done(done) => &done.idempotency_key,
},
}
}
}

View File

@@ -1,119 +0,0 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
use crate::config::PageServerConf;
use reqwest::Method;
use super::importbucket_format::Spec;
pub struct Client {
base_url: String,
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressRequest {
// no fields yet, not sure if there every will be any
}
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressResponse {
// we don't care
}
impl Client {
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
Ok(Self {
base_url: base_url.to_string(),
client: reqwest::Client::new(),
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token
.as_ref()
.map(|secret_string| secret_string.get_contents())
.map(|jwt| format!("Bearer {jwt}")),
})
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
let url = format!(
"{}/projects/{}/branches/{}/import_progress",
self.base_url, spec.project_id, spec.branch_id
);
let ImportProgressResponse {} = self
.request(Method::POST, url, &ImportProgressRequest {})
.await?
.json()
.await
.map_err(Error::ReceiveBody)?;
Ok(())
}
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
loop {
match self.send_progress_once(spec).await {
Ok(()) => return Ok(()),
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
Err(err) => {
error!(?err, "error sending progress, retrying");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cancel.cancelled(),
)
.await
.is_ok()
{
anyhow::bail!("cancelled while sending early progress update");
}
}
}
}
}
}

View File

@@ -3,7 +3,7 @@ use std::{collections::hash_map::Entry, fs, sync::Arc};
use anyhow::Context;
use camino::Utf8PathBuf;
use tracing::{error, info, info_span};
use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
use utils::{fs_ext, id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
@@ -23,14 +23,14 @@ use super::Timeline;
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
}
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
) -> Self {
Self {
owning_tenant,
@@ -87,10 +87,6 @@ impl<'t> UninitializedTimeline<'t> {
}
}
pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
self.raw_timeline.take().expect("already checked")
}
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
@@ -171,10 +167,9 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
/// A guard for timeline creations in process: as long as this object exists, the timeline ID
/// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
#[must_use]
pub(crate) struct TimelineCreateGuard {
pub(crate) _tenant_gate_guard: GateGuard,
pub(crate) owning_tenant: Arc<Tenant>,
pub(crate) timeline_id: TimelineId,
pub(crate) struct TimelineCreateGuard<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
pub(crate) timeline_path: Utf8PathBuf,
pub(crate) idempotency: CreateTimelineIdempotency,
}
@@ -189,27 +184,20 @@ pub(crate) enum TimelineExclusionError {
},
#[error("Already creating")]
AlreadyCreating,
#[error("Shutting down")]
ShuttingDown,
// e.g. I/O errors, or some failure deep in postgres initdb
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl TimelineCreateGuard {
impl<'t> TimelineCreateGuard<'t> {
pub(crate) fn new(
owning_tenant: &Arc<Tenant>,
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
timeline_path: Utf8PathBuf,
idempotency: CreateTimelineIdempotency,
allow_offloaded: bool,
) -> Result<Self, TimelineExclusionError> {
let _tenant_gate_guard = owning_tenant
.gate
.enter()
.map_err(|_| TimelineExclusionError::ShuttingDown)?;
// Lock order: this is the only place we take both locks. During drop() we only
// lock creating_timelines
let timelines = owning_tenant.timelines.lock().unwrap();
@@ -237,12 +225,8 @@ impl TimelineCreateGuard {
return Err(TimelineExclusionError::AlreadyCreating);
}
creating_timelines.insert(timeline_id);
drop(creating_timelines);
drop(timelines_offloaded);
drop(timelines);
Ok(Self {
_tenant_gate_guard,
owning_tenant: Arc::clone(owning_tenant),
owning_tenant,
timeline_id,
timeline_path,
idempotency,
@@ -250,7 +234,7 @@ impl TimelineCreateGuard {
}
}
impl Drop for TimelineCreateGuard {
impl Drop for TimelineCreateGuard<'_> {
fn drop(&mut self) {
self.owning_tenant
.timelines_creating

View File

@@ -3,7 +3,6 @@ use super::storage_layer::ResidentLayer;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
@@ -15,6 +14,7 @@ use utils::lsn::AtomicLsn;
use std::sync::atomic::AtomicU32;
use utils::lsn::Lsn;
#[cfg(feature = "testing")]
use utils::generation::Generation;
// clippy warns that Uninitialized is much smaller than Initialized, which wastes
@@ -38,12 +38,6 @@ impl UploadQueue {
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub(crate) enum OpType {
MayReorder,
FlushDeletion,
}
/// This keeps track of queued and in-progress tasks.
pub(crate) struct UploadQueueInitialized {
/// Counter to assign task IDs
@@ -74,6 +68,7 @@ pub(crate) struct UploadQueueInitialized {
pub(crate) num_inprogress_layer_uploads: usize,
pub(crate) num_inprogress_metadata_uploads: usize,
pub(crate) num_inprogress_deletions: usize,
pub(crate) num_inprogress_barriers: usize,
/// Tasks that are currently in-progress. In-progress means that a tokio Task
/// has been launched for it. An in-progress task can be busy uploading, but it can
@@ -94,9 +89,6 @@ pub(crate) struct UploadQueueInitialized {
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerName, Generation>,
/// Ensure we order file operations correctly.
pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
/// Deletions that are blocked by the tenant configuration
pub(crate) blocked_deletions: Vec<Delete>,
@@ -188,11 +180,11 @@ impl UploadQueue {
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
num_inprogress_barriers: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
@@ -230,11 +222,11 @@ impl UploadQueue {
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
num_inprogress_barriers: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
@@ -291,10 +283,18 @@ pub(crate) struct Delete {
pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
}
#[derive(Debug)]
pub(crate) enum BarrierType {
/// Barrier is a normal barrier, not an initial barrier.
Normal,
/// Barrier is an initial barrier, scheduled at timeline load.
Initial,
}
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file. The last field indicates the last operation for thie file.
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
/// Upload a layer file
UploadLayer(ResidentLayer, LayerFileMetadata),
/// Upload a index_part.json file
UploadMetadata {
@@ -306,7 +306,10 @@ pub(crate) enum UploadOp {
Delete(Delete),
/// Barrier. When the barrier operation is reached, the channel is closed.
Barrier(tokio::sync::watch::Sender<()>),
/// The boolean value indicates whether the barrier is an initial barrier scheduled
/// at timeline load -- if yes, we will need to wait for all deletions to be completed
/// before the next upload.
Barrier(tokio::sync::watch::Sender<()>, BarrierType),
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
/// this is the same as a Barrier.
@@ -316,11 +319,11 @@ pub(crate) enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata, mode) => {
UploadOp::UploadLayer(layer, metadata) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
layer, metadata.file_size, metadata.generation, mode
"UploadLayer({}, size={:?}, gen={:?})",
layer, metadata.file_size, metadata.generation
)
}
UploadOp::UploadMetadata { uploaded, .. } => {
@@ -333,7 +336,8 @@ impl std::fmt::Display for UploadOp {
UploadOp::Delete(delete) => {
write!(f, "Delete({} layers)", delete.layers.len())
}
UploadOp::Barrier(_) => write!(f, "Barrier"),
UploadOp::Barrier(_, BarrierType::Normal) => write!(f, "Barrier"),
UploadOp::Barrier(_, BarrierType::Initial) => write!(f, "Barrier (initial)"),
UploadOp::Shutdown => write!(f, "Shutdown"),
}
}

69
poetry.lock generated
View File

@@ -1858,54 +1858,47 @@ files = [
[[package]]
name = "mypy"
version = "1.13.0"
version = "1.3.0"
description = "Optional static typing for Python"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.7"
files = [
{file = "mypy-1.13.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:6607e0f1dd1fb7f0aca14d936d13fd19eba5e17e1cd2a14f808fa5f8f6d8f60a"},
{file = "mypy-1.13.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8a21be69bd26fa81b1f80a61ee7ab05b076c674d9b18fb56239d72e21d9f4c80"},
{file = "mypy-1.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7b2353a44d2179846a096e25691d54d59904559f4232519d420d64da6828a3a7"},
{file = "mypy-1.13.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0730d1c6a2739d4511dc4253f8274cdd140c55c32dfb0a4cf8b7a43f40abfa6f"},
{file = "mypy-1.13.0-cp310-cp310-win_amd64.whl", hash = "sha256:c5fc54dbb712ff5e5a0fca797e6e0aa25726c7e72c6a5850cfd2adbc1eb0a372"},
{file = "mypy-1.13.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:581665e6f3a8a9078f28d5502f4c334c0c8d802ef55ea0e7276a6e409bc0d82d"},
{file = "mypy-1.13.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3ddb5b9bf82e05cc9a627e84707b528e5c7caaa1c55c69e175abb15a761cec2d"},
{file = "mypy-1.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:20c7ee0bc0d5a9595c46f38beb04201f2620065a93755704e141fcac9f59db2b"},
{file = "mypy-1.13.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3790ded76f0b34bc9c8ba4def8f919dd6a46db0f5a6610fb994fe8efdd447f73"},
{file = "mypy-1.13.0-cp311-cp311-win_amd64.whl", hash = "sha256:51f869f4b6b538229c1d1bcc1dd7d119817206e2bc54e8e374b3dfa202defcca"},
{file = "mypy-1.13.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:5c7051a3461ae84dfb5dd15eff5094640c61c5f22257c8b766794e6dd85e72d5"},
{file = "mypy-1.13.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:39bb21c69a5d6342f4ce526e4584bc5c197fd20a60d14a8624d8743fffb9472e"},
{file = "mypy-1.13.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:164f28cb9d6367439031f4c81e84d3ccaa1e19232d9d05d37cb0bd880d3f93c2"},
{file = "mypy-1.13.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a4c1bfcdbce96ff5d96fc9b08e3831acb30dc44ab02671eca5953eadad07d6d0"},
{file = "mypy-1.13.0-cp312-cp312-win_amd64.whl", hash = "sha256:a0affb3a79a256b4183ba09811e3577c5163ed06685e4d4b46429a271ba174d2"},
{file = "mypy-1.13.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a7b44178c9760ce1a43f544e595d35ed61ac2c3de306599fa59b38a6048e1aa7"},
{file = "mypy-1.13.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5d5092efb8516d08440e36626f0153b5006d4088c1d663d88bf79625af3d1d62"},
{file = "mypy-1.13.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:de2904956dac40ced10931ac967ae63c5089bd498542194b436eb097a9f77bc8"},
{file = "mypy-1.13.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:7bfd8836970d33c2105562650656b6846149374dc8ed77d98424b40b09340ba7"},
{file = "mypy-1.13.0-cp313-cp313-win_amd64.whl", hash = "sha256:9f73dba9ec77acb86457a8fc04b5239822df0c14a082564737833d2963677dbc"},
{file = "mypy-1.13.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:100fac22ce82925f676a734af0db922ecfea991e1d7ec0ceb1e115ebe501301a"},
{file = "mypy-1.13.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7bcb0bb7f42a978bb323a7c88f1081d1b5dee77ca86f4100735a6f541299d8fb"},
{file = "mypy-1.13.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bde31fc887c213e223bbfc34328070996061b0833b0a4cfec53745ed61f3519b"},
{file = "mypy-1.13.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:07de989f89786f62b937851295ed62e51774722e5444a27cecca993fc3f9cd74"},
{file = "mypy-1.13.0-cp38-cp38-win_amd64.whl", hash = "sha256:4bde84334fbe19bad704b3f5b78c4abd35ff1026f8ba72b29de70dda0916beb6"},
{file = "mypy-1.13.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0246bcb1b5de7f08f2826451abd947bf656945209b140d16ed317f65a17dc7dc"},
{file = "mypy-1.13.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:7f5b7deae912cf8b77e990b9280f170381fdfbddf61b4ef80927edd813163732"},
{file = "mypy-1.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7029881ec6ffb8bc233a4fa364736789582c738217b133f1b55967115288a2bc"},
{file = "mypy-1.13.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:3e38b980e5681f28f033f3be86b099a247b13c491f14bb8b1e1e134d23bb599d"},
{file = "mypy-1.13.0-cp39-cp39-win_amd64.whl", hash = "sha256:a6789be98a2017c912ae6ccb77ea553bbaf13d27605d2ca20a76dfbced631b24"},
{file = "mypy-1.13.0-py3-none-any.whl", hash = "sha256:9c250883f9fd81d212e0952c92dbfcc96fc237f4b7c92f56ac81fd48460b3e5a"},
{file = "mypy-1.13.0.tar.gz", hash = "sha256:0291a61b6fbf3e6673e3405cfcc0e7650bebc7939659fdca2702958038bd835e"},
{file = "mypy-1.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c1eb485cea53f4f5284e5baf92902cd0088b24984f4209e25981cc359d64448d"},
{file = "mypy-1.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4c99c3ecf223cf2952638da9cd82793d8f3c0c5fa8b6ae2b2d9ed1e1ff51ba85"},
{file = "mypy-1.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:550a8b3a19bb6589679a7c3c31f64312e7ff482a816c96e0cecec9ad3a7564dd"},
{file = "mypy-1.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:cbc07246253b9e3d7d74c9ff948cd0fd7a71afcc2b77c7f0a59c26e9395cb152"},
{file = "mypy-1.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:a22435632710a4fcf8acf86cbd0d69f68ac389a3892cb23fbad176d1cddaf228"},
{file = "mypy-1.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6e33bb8b2613614a33dff70565f4c803f889ebd2f859466e42b46e1df76018dd"},
{file = "mypy-1.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7d23370d2a6b7a71dc65d1266f9a34e4cde9e8e21511322415db4b26f46f6b8c"},
{file = "mypy-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:658fe7b674769a0770d4b26cb4d6f005e88a442fe82446f020be8e5f5efb2fae"},
{file = "mypy-1.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6e42d29e324cdda61daaec2336c42512e59c7c375340bd202efa1fe0f7b8f8ca"},
{file = "mypy-1.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:d0b6c62206e04061e27009481cb0ec966f7d6172b5b936f3ead3d74f29fe3dcf"},
{file = "mypy-1.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:76ec771e2342f1b558c36d49900dfe81d140361dd0d2df6cd71b3db1be155409"},
{file = "mypy-1.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ebc95f8386314272bbc817026f8ce8f4f0d2ef7ae44f947c4664efac9adec929"},
{file = "mypy-1.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:faff86aa10c1aa4a10e1a301de160f3d8fc8703b88c7e98de46b531ff1276a9a"},
{file = "mypy-1.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:8c5979d0deb27e0f4479bee18ea0f83732a893e81b78e62e2dda3e7e518c92ee"},
{file = "mypy-1.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c5d2cc54175bab47011b09688b418db71403aefad07cbcd62d44010543fc143f"},
{file = "mypy-1.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:87df44954c31d86df96c8bd6e80dfcd773473e877ac6176a8e29898bfb3501cb"},
{file = "mypy-1.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:473117e310febe632ddf10e745a355714e771ffe534f06db40702775056614c4"},
{file = "mypy-1.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:74bc9b6e0e79808bf8678d7678b2ae3736ea72d56eede3820bd3849823e7f305"},
{file = "mypy-1.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:44797d031a41516fcf5cbfa652265bb994e53e51994c1bd649ffcd0c3a7eccbf"},
{file = "mypy-1.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ddae0f39ca146972ff6bb4399f3b2943884a774b8771ea0a8f50e971f5ea5ba8"},
{file = "mypy-1.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1c4c42c60a8103ead4c1c060ac3cdd3ff01e18fddce6f1016e08939647a0e703"},
{file = "mypy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e86c2c6852f62f8f2b24cb7a613ebe8e0c7dc1402c61d36a609174f63e0ff017"},
{file = "mypy-1.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:f9dca1e257d4cc129517779226753dbefb4f2266c4eaad610fc15c6a7e14283e"},
{file = "mypy-1.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:95d8d31a7713510685b05fbb18d6ac287a56c8f6554d88c19e73f724a445448a"},
{file = "mypy-1.3.0-py3-none-any.whl", hash = "sha256:a8763e72d5d9574d45ce5881962bc8e9046bf7b375b0abf031f3e6811732a897"},
{file = "mypy-1.3.0.tar.gz", hash = "sha256:e1f4d16e296f5135624b34e8fb741eb0eadedca90862405b1f1fde2040b9bd11"},
]
[package.dependencies]
mypy-extensions = ">=1.0.0"
typing-extensions = ">=4.6.0"
typing-extensions = ">=3.10"
[package.extras]
dmypy = ["psutil (>=4.0)"]
faster-cache = ["orjson"]
install-types = ["pip"]
mypyc = ["setuptools (>=50)"]
python2 = ["typed-ast (>=1.4.0,<2)"]
reports = ["lxml"]
[[package]]
@@ -3524,4 +3517,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "21debe1116843e5d14bdf37d6e265c68c63a98a64ba04ec8b8a02af2e8d9f486"
content-hash = "5a9b8c8d409acb840c0a94dcdec6aac9777ccec443d74c78dbd511fa223cd6f6"

View File

@@ -6,7 +6,6 @@ use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
use super::ComputeCredentialKeys;
use crate::auth::IpPattern;
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
@@ -75,10 +74,10 @@ impl ConsoleRedirectBackend {
ctx: &RequestContext,
auth_config: &'static AuthenticationConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(ConsoleRedirectNodeInfo, Option<Vec<IpPattern>>)> {
) -> auth::Result<ConsoleRedirectNodeInfo> {
authenticate(ctx, auth_config, &self.console_uri, client)
.await
.map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist))
.map(ConsoleRedirectNodeInfo)
}
}
@@ -103,7 +102,7 @@ async fn authenticate(
auth_config: &'static AuthenticationConfig,
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(NodeInfo, Option<Vec<IpPattern>>)> {
) -> auth::Result<NodeInfo> {
ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
// registering waiter can fail if we get unlucky with rng.
@@ -177,12 +176,9 @@ async fn authenticate(
config.password(password.as_ref());
}
Ok((
NodeInfo {
config,
aux: db_info.aux,
allow_self_signed_compute: false, // caller may override
},
db_info.allowed_ips,
))
Ok(NodeInfo {
config,
aux: db_info.aux,
allow_self_signed_compute: false, // caller may override
})
}

View File

@@ -132,93 +132,6 @@ struct JwkSet<'a> {
keys: Vec<&'a RawValue>,
}
/// Given a jwks_url, fetch the JWKS and parse out all the signing JWKs.
/// Returns `None` and log a warning if there are any errors.
async fn fetch_jwks(
client: &reqwest_middleware::ClientWithMiddleware,
jwks_url: url::Url,
) -> Option<jose_jwk::JwkSet> {
let req = client.get(jwks_url.clone());
// TODO(conrad): We need to filter out URLs that point to local resources. Public internet only.
let resp = req.send().await.and_then(|r| {
r.error_for_status()
.map_err(reqwest_middleware::Error::Reqwest)
});
let resp = match resp {
Ok(r) => r,
// TODO: should we re-insert JWKs if we want to keep this JWKs URL?
// I expect these failures would be quite sparse.
Err(e) => {
tracing::warn!(url=?jwks_url, error=?e, "could not fetch JWKs");
return None;
}
};
let resp: http::Response<reqwest::Body> = resp.into();
let bytes = match read_body_with_limit(resp.into_body(), MAX_JWK_BODY_SIZE).await {
Ok(bytes) => bytes,
Err(e) => {
tracing::warn!(url=?jwks_url, error=?e, "could not decode JWKs");
return None;
}
};
let jwks = match serde_json::from_slice::<JwkSet>(&bytes) {
Ok(jwks) => jwks,
Err(e) => {
tracing::warn!(url=?jwks_url, error=?e, "could not decode JWKs");
return None;
}
};
// `jose_jwk::Jwk` is quite large (288 bytes). Let's not pre-allocate for what we don't need.
//
// Even though we limit our responses to 64KiB, we could still receive a payload like
// `{"keys":[` + repeat(`0`).take(30000).join(`,`) + `]}`. Parsing this as `RawValue` uses 468KiB.
// Pre-allocating the corresponding `Vec::<jose_jwk::Jwk>::with_capacity(30000)` uses 8.2MiB.
let mut keys = vec![];
let mut failed = 0;
for key in jwks.keys {
let key = match serde_json::from_str::<jose_jwk::Jwk>(key.get()) {
Ok(key) => key,
Err(e) => {
tracing::debug!(url=?jwks_url, failed=?e, "could not decode JWK");
failed += 1;
continue;
}
};
// if `use` (called `cls` in rust) is specified to be something other than signing,
// we can skip storing it.
if key
.prm
.cls
.as_ref()
.is_some_and(|c| *c != jose_jwk::Class::Signing)
{
continue;
}
keys.push(key);
}
keys.shrink_to_fit();
if failed > 0 {
tracing::warn!(url=?jwks_url, failed, "could not decode JWKs");
}
if keys.is_empty() {
tracing::warn!(url=?jwks_url, "no valid JWKs found inside the response body");
return None;
}
Some(jose_jwk::JwkSet { keys })
}
impl JwkCacheEntryLock {
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
JwkRenewalPermit::acquire_permit(self).await
@@ -253,15 +166,87 @@ impl JwkCacheEntryLock {
// TODO(conrad): run concurrently
// TODO(conrad): strip the JWKs urls (should be checked by cplane as well - cloud#16284)
for rule in rules {
if let Some(jwks) = fetch_jwks(client, rule.jwks_url).await {
key_sets.insert(
rule.id,
KeySet {
jwks,
audience: rule.audience,
role_names: rule.role_names,
},
);
let req = client.get(rule.jwks_url.clone());
// TODO(conrad): eventually switch to using reqwest_middleware/`new_client_with_timeout`.
// TODO(conrad): We need to filter out URLs that point to local resources. Public internet only.
match req.send().await.and_then(|r| {
r.error_for_status()
.map_err(reqwest_middleware::Error::Reqwest)
}) {
// todo: should we re-insert JWKs if we want to keep this JWKs URL?
// I expect these failures would be quite sparse.
Err(e) => tracing::warn!(url=?rule.jwks_url, error=?e, "could not fetch JWKs"),
Ok(r) => {
let resp: http::Response<reqwest::Body> = r.into();
let bytes = match read_body_with_limit(resp.into_body(), MAX_JWK_BODY_SIZE)
.await
{
Ok(bytes) => bytes,
Err(e) => {
tracing::warn!(url=?rule.jwks_url, error=?e, "could not decode JWKs");
continue;
}
};
match serde_json::from_slice::<JwkSet>(&bytes) {
Err(e) => {
tracing::warn!(url=?rule.jwks_url, error=?e, "could not decode JWKs");
}
Ok(jwks) => {
// size_of::<&RawValue>() == 16
// size_of::<jose_jwk::Jwk>() == 288
// better to not pre-allocate this as it might be pretty large - especially if it has many
// keys we don't want or need.
// trivial 'attack': `{"keys":[` + repeat(`0`).take(30000).join(`,`) + `]}`
// this would consume 8MiB just like that!
let mut keys = vec![];
let mut failed = 0;
for key in jwks.keys {
match serde_json::from_str::<jose_jwk::Jwk>(key.get()) {
Ok(key) => {
// if `use` (called `cls` in rust) is specified to be something other than signing,
// we can skip storing it.
if key
.prm
.cls
.as_ref()
.is_some_and(|c| *c != jose_jwk::Class::Signing)
{
continue;
}
keys.push(key);
}
Err(e) => {
tracing::debug!(url=?rule.jwks_url, failed=?e, "could not decode JWK");
failed += 1;
}
}
}
keys.shrink_to_fit();
if failed > 0 {
tracing::warn!(url=?rule.jwks_url, failed, "could not decode JWKs");
}
if keys.is_empty() {
tracing::warn!(url=?rule.jwks_url, "no valid JWKs found inside the response body");
continue;
}
let jwks = jose_jwk::JwkSet { keys };
key_sets.insert(
rule.id,
KeySet {
jwks,
audience: rule.audience,
role_names: rule.role_names,
},
);
}
};
}
}
}

View File

@@ -6,6 +6,7 @@ pub mod local;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
pub use console_redirect::ConsoleRedirectBackend;
pub(crate) use console_redirect::ConsoleRedirectError;
@@ -29,7 +30,7 @@ use crate::intern::EndpointIdInt;
use crate::metrics::Metrics;
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter};
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter, RateBucketInfo};
use crate::stream::Stream;
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{scram, stream};
@@ -191,6 +192,21 @@ impl MaskedIp {
// This can't be just per IP because that would limit some PaaS that share IP addresses
pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, MaskedIp)>;
impl RateBucketInfo {
/// All of these are per endpoint-maskedip pair.
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
///
/// First bucket: 1000mcpus total per endpoint-ip pair
/// * 4096000 requests per second with 1 hash rounds.
/// * 1000 requests per second with 4096 hash rounds.
/// * 6.8 requests per second with 600000 hash rounds.
pub const DEFAULT_AUTH_SET: [Self; 3] = [
Self::new(1000 * 4096, Duration::from_secs(1)),
Self::new(600 * 4096, Duration::from_secs(60)),
Self::new(300 * 4096, Duration::from_secs(600)),
];
}
impl AuthenticationConfig {
pub(crate) fn check_rate_limit(
&self,

View File

@@ -428,9 +428,8 @@ async fn main() -> anyhow::Result<()> {
)?))),
None => None,
};
let cancellation_handler = Arc::new(CancellationHandler::<
Option<Arc<Mutex<RedisPublisherClient>>>,
Option<Arc<tokio::sync::Mutex<RedisPublisherClient>>>,
>::new(
cancel_map.clone(),
redis_publisher,

View File

@@ -10,23 +10,16 @@ use tokio_postgres::{CancelToken, NoTls};
use tracing::{debug, info};
use uuid::Uuid;
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::error::ReportableError;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};
use std::net::IpAddr;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
type IpSubnetKey = IpNet;
/// Enables serving `CancelRequest`s.
///
/// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
@@ -36,23 +29,14 @@ pub struct CancellationHandler<P> {
/// This field used for the monitoring purposes.
/// Represents the source of the cancellation request.
from: CancellationSource,
// rate limiter of cancellation requests
limiter: Arc<std::sync::Mutex<LeakyBucketRateLimiter<IpSubnetKey>>>,
}
#[derive(Debug, Error)]
pub(crate) enum CancelError {
#[error("{0}")]
IO(#[from] std::io::Error),
#[error("{0}")]
Postgres(#[from] tokio_postgres::Error),
#[error("rate limit exceeded")]
RateLimit,
#[error("IP is not allowed")]
IpNotAllowed,
}
impl ReportableError for CancelError {
@@ -63,8 +47,6 @@ impl ReportableError for CancelError {
crate::error::ErrorKind::Postgres
}
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
CancelError::RateLimit => crate::error::ErrorKind::RateLimit,
CancelError::IpNotAllowed => crate::error::ErrorKind::User,
}
}
}
@@ -97,36 +79,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
cancellation_handler: self,
}
}
/// Try to cancel a running query for the corresponding connection.
/// If the cancellation key is not found, it will be published to Redis.
/// check_allowed - if true, check if the IP is allowed to cancel the query
pub(crate) async fn cancel_session(
&self,
key: CancelKeyData,
session_id: Uuid,
peer_addr: &IpAddr,
check_allowed: bool,
) -> Result<(), CancelError> {
// TODO: check for unspecified address is only for backward compatibility, should be removed
if !peer_addr.is_unspecified() {
let subnet_key = match *peer_addr {
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
};
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
tracing::debug!("Rate limit exceeded. Skipping cancellation message");
Metrics::get()
.proxy
.cancellation_requests_total
.inc(CancellationRequest {
source: self.from,
kind: crate::metrics::CancellationOutcome::RateLimitExceeded,
});
return Err(CancelError::RateLimit);
}
}
// NB: we should immediately release the lock after cloning the token.
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
tracing::warn!("query cancellation key not found: {key}");
@@ -137,13 +96,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
source: self.from,
kind: crate::metrics::CancellationOutcome::NotFound,
});
if session_id == Uuid::nil() {
// was already published, do not publish it again
return Ok(());
}
match self.client.try_publish(key, session_id, *peer_addr).await {
match self.client.try_publish(key, session_id).await {
Ok(()) => {} // do nothing
Err(e) => {
return Err(CancelError::IO(std::io::Error::new(
@@ -154,13 +107,6 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
}
return Ok(());
};
if check_allowed
&& !check_peer_addr_is_in_list(peer_addr, cancel_closure.ip_allowlist.as_slice())
{
return Err(CancelError::IpNotAllowed);
}
Metrics::get()
.proxy
.cancellation_requests_total
@@ -189,29 +135,13 @@ impl CancellationHandler<()> {
map,
client: (),
from,
limiter: Arc::new(std::sync::Mutex::new(
LeakyBucketRateLimiter::<IpSubnetKey>::new_with_shards(
LeakyBucketRateLimiter::<IpSubnetKey>::DEFAULT,
64,
),
)),
}
}
}
impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
Self {
map,
client,
from,
limiter: Arc::new(std::sync::Mutex::new(
LeakyBucketRateLimiter::<IpSubnetKey>::new_with_shards(
LeakyBucketRateLimiter::<IpSubnetKey>::DEFAULT,
64,
),
)),
}
Self { map, client, from }
}
}
@@ -222,19 +152,13 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
}
impl CancelClosure {
pub(crate) fn new(
socket_addr: SocketAddr,
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
) -> Self {
pub(crate) fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self {
Self {
socket_addr,
cancel_token,
ip_allowlist,
}
}
/// Cancels the query running on user's compute node.
@@ -244,9 +168,6 @@ impl CancelClosure {
debug!("query was cancelled");
Ok(())
}
pub(crate) fn set_ip_allowlist(&mut self, ip_allowlist: Vec<IpPattern>) {
self.ip_allowlist = ip_allowlist;
}
}
/// Helper for registering query cancellation tokens.
@@ -308,8 +229,6 @@ mod tests {
cancel_key: 0,
},
Uuid::new_v4(),
&("127.0.0.1".parse().unwrap()),
true,
)
.await
.unwrap();

View File

@@ -342,7 +342,7 @@ impl ConnCfg {
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token(), vec![]);
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let connection = PostgresConnection {
stream,

View File

@@ -156,21 +156,16 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return Ok(cancellation_handler
.cancel_session(
cancel_key_data,
ctx.session_id(),
&ctx.peer_addr(),
config.authentication_config.ip_allowlist_check_enabled,
)
.cancel_session(cancel_key_data, ctx.session_id())
.await
.map(|()| None)?)
}
@@ -179,7 +174,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
ctx.set_db_options(params.clone());
let (user_info, ip_allowlist) = match backend
let user_info = match backend
.authenticate(ctx, &config.authentication_config, &mut stream)
.await
{
@@ -203,8 +198,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
.or_else(|e| stream.throw_error(e))
.await?;
node.cancel_closure
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;

View File

@@ -351,7 +351,6 @@ pub enum CancellationSource {
pub enum CancellationOutcome {
NotFound,
Found,
RateLimitExceeded,
}
#[derive(LabelGroup)]

View File

@@ -268,18 +268,12 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return Ok(cancellation_handler
.cancel_session(
cancel_key_data,
ctx.session_id(),
&ctx.peer_addr(),
config.authentication_config.ip_allowlist_check_enabled,
)
.cancel_session(cancel_key_data, ctx.session_id())
.await
.map(|()| None)?)
}

View File

@@ -14,13 +14,13 @@ use tracing::info;
use crate::intern::EndpointIdInt;
pub struct GlobalRateLimiter {
pub(crate) struct GlobalRateLimiter {
data: Vec<RateBucket>,
info: Vec<RateBucketInfo>,
}
impl GlobalRateLimiter {
pub fn new(info: Vec<RateBucketInfo>) -> Self {
pub(crate) fn new(info: Vec<RateBucketInfo>) -> Self {
Self {
data: vec![
RateBucket {
@@ -34,7 +34,7 @@ impl GlobalRateLimiter {
}
/// Check that number of connections is below `max_rps` rps.
pub fn check(&mut self) -> bool {
pub(crate) fn check(&mut self) -> bool {
let now = Instant::now();
let should_allow_request = self
@@ -137,19 +137,6 @@ impl RateBucketInfo {
Self::new(200, Duration::from_secs(600)),
];
/// All of these are per endpoint-maskedip pair.
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
///
/// First bucket: 1000mcpus total per endpoint-ip pair
/// * 4096000 requests per second with 1 hash rounds.
/// * 1000 requests per second with 4096 hash rounds.
/// * 6.8 requests per second with 600000 hash rounds.
pub const DEFAULT_AUTH_SET: [Self; 3] = [
Self::new(1000 * 4096, Duration::from_secs(1)),
Self::new(600 * 4096, Duration::from_secs(60)),
Self::new(300 * 4096, Duration::from_secs(600)),
];
pub fn rps(&self) -> f64 {
(self.max_rpi as f64) / self.interval.as_secs_f64()
}

View File

@@ -8,4 +8,5 @@ pub(crate) use limit_algorithm::aimd::Aimd;
pub(crate) use limit_algorithm::{
DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
};
pub use limiter::{BucketRateLimiter, GlobalRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
pub(crate) use limiter::GlobalRateLimiter;
pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter};

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use core::net::IpAddr;
use pq_proto::CancelKeyData;
use redis::AsyncCommands;
use tokio::sync::Mutex;
@@ -16,7 +15,6 @@ pub trait CancellationPublisherMut: Send + Sync + 'static {
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
@@ -26,7 +24,6 @@ pub trait CancellationPublisher: Send + Sync + 'static {
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
@@ -35,7 +32,6 @@ impl CancellationPublisher for () {
&self,
_cancel_key_data: CancelKeyData,
_session_id: Uuid,
_peer_addr: IpAddr,
) -> anyhow::Result<()> {
Ok(())
}
@@ -46,10 +42,8 @@ impl<P: CancellationPublisher> CancellationPublisherMut for P {
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
.await
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id).await
}
}
@@ -58,10 +52,9 @@ impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
if let Some(p) = self {
p.try_publish(cancel_key_data, session_id, peer_addr).await
p.try_publish(cancel_key_data, session_id).await
} else {
Ok(())
}
@@ -73,11 +66,10 @@ impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
self.lock()
.await
.try_publish(cancel_key_data, session_id, peer_addr)
.try_publish(cancel_key_data, session_id)
.await
}
}
@@ -105,13 +97,11 @@ impl RedisPublisherClient {
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
region_id: Some(self.region_id.clone()),
cancel_key_data,
session_id,
peer_addr: Some(peer_addr),
}))?;
let _: () = self.client.publish(PROXY_CHANNEL_NAME, payload).await?;
Ok(())
@@ -130,14 +120,13 @@ impl RedisPublisherClient {
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
// TODO: review redundant error duplication logs.
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping cancellation message");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.publish(cancel_key_data, session_id, peer_addr).await {
match self.publish(cancel_key_data, session_id).await {
Ok(()) => return Ok(()),
Err(e) => {
tracing::error!("failed to publish a message: {e}");
@@ -145,7 +134,7 @@ impl RedisPublisherClient {
}
tracing::info!("Publisher is disconnected. Reconnectiong...");
self.try_connect().await?;
self.publish(cancel_key_data, session_id, peer_addr).await
self.publish(cancel_key_data, session_id).await
}
}
@@ -154,13 +143,9 @@ impl CancellationPublisherMut for RedisPublisherClient {
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
tracing::info!("publishing cancellation key to Redis");
match self
.try_publish_internal(cancel_key_data, session_id, peer_addr)
.await
{
match self.try_publish_internal(cancel_key_data, session_id).await {
Ok(()) => {
tracing::debug!("cancellation key successfuly published to Redis");
Ok(())

View File

@@ -60,7 +60,6 @@ pub(crate) struct CancelSession {
pub(crate) region_id: Option<String>,
pub(crate) cancel_key_data: CancelKeyData,
pub(crate) session_id: Uuid,
pub(crate) peer_addr: Option<std::net::IpAddr>,
}
fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result<T, D::Error>
@@ -138,20 +137,10 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
return Ok(());
}
}
// TODO: Remove unspecified peer_addr after the complete migration to the new format
let peer_addr = cancel_session
.peer_addr
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
match self
.cancellation_handler
.cancel_session(
cancel_session.cancel_key_data,
uuid::Uuid::nil(),
&peer_addr,
cancel_session.peer_addr.is_some(),
)
.cancel_session(cancel_session.cancel_key_data, uuid::Uuid::nil())
.await
{
Ok(()) => {}
@@ -346,7 +335,6 @@ mod tests {
cancel_key_data,
region_id: None,
session_id: uuid,
peer_addr: None,
});
let text = serde_json::to_string(&msg)?;
let result: Notification = serde_json::from_str(&text)?;
@@ -356,7 +344,6 @@ mod tests {
cancel_key_data,
region_id: Some("region".to_string()),
session_id: uuid,
peer_addr: None,
});
let text = serde_json::to_string(&msg)?;
let result: Notification = serde_json::from_str(&text)?;

View File

@@ -51,7 +51,7 @@ testcontainers = "^4.8.1"
jsonnet = "^0.20.0"
[tool.poetry.group.dev.dependencies]
mypy = "==1.13.0"
mypy = "==1.3.0"
ruff = "^0.7.0"
[build-system]

View File

@@ -30,7 +30,6 @@ once_cell.workspace = true
parking_lot.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
pprof.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true

View File

@@ -14,10 +14,6 @@ cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false
# List available benchmarks.
cargo bench --package safekeeper --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.

View File

@@ -10,7 +10,6 @@ use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
use itertools::Itertools as _;
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
use pprof::criterion::{Output, PProfProfiler};
use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
@@ -25,9 +24,8 @@ const GB: usize = 1024 * MB;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_process_msg,
benches,
bench_process_msg,
bench_wal_acceptor,
bench_wal_acceptor_throughput,
bench_file_write

View File

@@ -1,6 +1,7 @@
use hyper::{Body, Request, Response, StatusCode};
use hyper::{Body, Request, Response, StatusCode, Uri};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::io::Write as _;
use std::str::FromStr;
@@ -13,9 +14,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{
profile_cpu_handler, prometheus_metrics_handler, request_span, ChannelWriter,
};
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
@@ -573,8 +572,14 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
let mut router = endpoint::make_router();
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
const ALLOWLIST_ROUTES: &[&str] = &["/v1/status", "/metrics", "/profile/cpu"];
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {
// Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
@@ -593,7 +598,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.data(Arc::new(conf))
.data(auth)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {

View File

@@ -194,11 +194,9 @@ async def main_impl(args, report_out, client: Client):
tenant_ids = await client.get_tenant_ids()
get_timeline_id_coros = [client.get_timeline_ids(tenant_id) for tenant_id in tenant_ids]
gathered = await asyncio.gather(*get_timeline_id_coros, return_exceptions=True)
assert len(tenant_ids) == len(gathered)
tenant_and_timline_ids = []
for tid, tlids in zip(tenant_ids, gathered, strict=True):
# TODO: add error handling if tlids isinstance(Exception)
assert isinstance(tlids, list)
for tid, tlids in zip(tenant_ids, gathered, strict=False):
for tlid in tlids:
tenant_and_timline_ids.append((tid, tlid))
elif len(comps) == 1:

View File

@@ -31,7 +31,6 @@ CREATE TABLE IF NOT EXISTS results (
duration INT NOT NULL,
flaky BOOLEAN NOT NULL,
arch arch DEFAULT 'X64',
lfc BOOLEAN DEFAULT false NOT NULL,
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
@@ -55,7 +54,6 @@ class Row:
duration: int
flaky: bool
arch: str
lfc: bool
build_type: str
pg_version: int
run_id: int
@@ -134,7 +132,6 @@ def ingest_test_result(
if p["name"].startswith("__")
}
arch = parameters.get("arch", "UNKNOWN").strip("'")
lfc = parameters.get("lfc", "False") == "True"
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
@@ -148,7 +145,6 @@ def ingest_test_result(
duration=test["time"]["duration"],
flaky=test["flaky"] or test["retriesStatusChange"],
arch=arch,
lfc=lfc,
build_type=build_type,
pg_version=pg_version,
run_id=run_id,

View File

@@ -190,23 +190,8 @@ class TenantTimelineId:
)
@dataclass
class ShardIndex:
shard_number: int
shard_count: int
# cf impl Display for ShardIndex
@override
def __str__(self) -> str:
return f"{self.shard_number:02x}{self.shard_count:02x}"
@classmethod
def parse(cls: type[ShardIndex], input: str) -> ShardIndex:
assert len(input) == 4
return cls(
shard_number=int(input[0:2], 16),
shard_count=int(input[2:4], 16),
)
# Workaround for compat with python 3.9, which does not have `typing.Self`
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
class TenantShardId:
@@ -217,7 +202,7 @@ class TenantShardId:
assert self.shard_number < self.shard_count or self.shard_count == 0
@classmethod
def parse(cls: type[TenantShardId], input: str) -> TenantShardId:
def parse(cls: type[TTenantShardId], input: str) -> TTenantShardId:
if len(input) == 32:
return cls(
tenant_id=TenantId(input),
@@ -241,10 +226,6 @@ class TenantShardId:
# Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id)
return str(self.tenant_id)
@property
def shard_index(self) -> ShardIndex:
return ShardIndex(self.shard_number, self.shard_count)
@override
def __repr__(self):
return self.__str__()

View File

@@ -69,7 +69,7 @@ def compute_reconfigure_listener(make_httpserver: HTTPServer):
# This causes the endpoint to query storage controller for its location, which
# is redundant since we already have it here, but this avoids extending the
# neon_local CLI to take full lists of locations
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[misc]
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
return Response(status=200)

View File

@@ -20,9 +20,12 @@ from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from typing import (
Any,
TypeVar,
cast,
)
T = TypeVar("T")
# Used to be an ABC. abc.ABC removed due to linter without name change.
class AbstractNeonCli:

View File

@@ -90,12 +90,10 @@ from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
COMPONENT_BINARIES,
USE_LFC,
allure_add_grafana_links,
assert_no_errors,
get_dir_size,
print_gc_result,
size_to_bytes,
subprocess_capture,
wait_until,
)
@@ -104,7 +102,10 @@ from .neon_api import NeonAPI, NeonApiEndpoint
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Self, TypeVar
from typing import (
Any,
TypeVar,
)
from fixtures.paths import SnapshotDirLocked
@@ -837,7 +838,7 @@ class NeonEnvBuilder:
if isinstance(x, S3Storage):
x.do_cleanup()
def __enter__(self) -> Self:
def __enter__(self) -> NeonEnvBuilder:
return self
def __exit__(
@@ -1147,19 +1148,21 @@ class NeonEnv:
with concurrent.futures.ThreadPoolExecutor(
max_workers=2 + len(self.pageservers) + len(self.safekeepers)
) as executor:
futs.append(executor.submit(lambda: self.broker.start()))
futs.append(
executor.submit(lambda: self.broker.start() or None)
) # The `or None` is for the linter
for pageserver in self.pageservers:
futs.append(
executor.submit(
lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc]
lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds)
)
)
for safekeeper in self.safekeepers:
futs.append(
executor.submit(
lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc]
lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds)
)
)
@@ -1599,13 +1602,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
timeout_in_seconds: int | None = None,
instance_id: int | None = None,
base_port: int | None = None,
) -> Self:
):
assert not self.running
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
self.running = True
return self
def stop(self, immediate: bool = False) -> Self:
def stop(self, immediate: bool = False) -> NeonStorageController:
if self.running:
self.env.neon_cli.storage_controller_stop(immediate)
self.running = False
@@ -1885,20 +1888,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
log.info(f"tenant_create success: {response.json()}")
def timeline_create(
self,
tenant_id: TenantId,
body: dict[str, Any],
):
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline",
json=body,
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
response.raise_for_status()
log.info(f"timeline_create success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}
@@ -2293,7 +2282,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
def __enter__(self) -> Self:
def __enter__(self) -> NeonStorageController:
return self
def __exit__(
@@ -2315,7 +2304,7 @@ class NeonProxiedStorageController(NeonStorageController):
timeout_in_seconds: int | None = None,
instance_id: int | None = None,
base_port: int | None = None,
) -> Self:
):
assert instance_id is not None and base_port is not None
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
@@ -2335,7 +2324,7 @@ class NeonProxiedStorageController(NeonStorageController):
self.running = any(meta["running"] for meta in self.instances.values())
return self
def stop(self, immediate: bool = False) -> Self:
def stop(self, immediate: bool = False) -> NeonStorageController:
for iid, details in self.instances.items():
if details["running"]:
self.env.neon_cli.storage_controller_stop(immediate, iid)
@@ -2457,7 +2446,7 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
extra_env_vars: dict[str, str] | None = None,
timeout_in_seconds: int | None = None,
) -> Self:
) -> NeonPageserver:
"""
Start the page server.
`overrides` allows to add some config to this pageserver start.
@@ -2492,7 +2481,7 @@ class NeonPageserver(PgProtocol, LogUtils):
return self
def stop(self, immediate: bool = False) -> Self:
def stop(self, immediate: bool = False) -> NeonPageserver:
"""
Stop the page server.
Returns self.
@@ -2540,7 +2529,7 @@ class NeonPageserver(PgProtocol, LogUtils):
wait_until(20, 0.5, complete)
def __enter__(self) -> Self:
def __enter__(self) -> NeonPageserver:
return self
def __exit__(
@@ -2968,7 +2957,7 @@ class VanillaPostgres(PgProtocol):
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(self.pgdatadir / subdir)
def __enter__(self) -> Self:
def __enter__(self) -> VanillaPostgres:
return self
def __exit__(
@@ -3017,7 +3006,7 @@ class RemotePostgres(PgProtocol):
# See https://www.postgresql.org/docs/14/functions-admin.html#FUNCTIONS-ADMIN-GENFILE
raise Exception("cannot get size of a Postgres instance")
def __enter__(self) -> Self:
def __enter__(self) -> RemotePostgres:
return self
def __exit__(
@@ -3231,7 +3220,7 @@ class NeonProxy(PgProtocol):
self.http_timeout_seconds = 15
self._popen: subprocess.Popen[bytes] | None = None
def start(self) -> Self:
def start(self) -> NeonProxy:
assert self._popen is None
# generate key of it doesn't exist
@@ -3359,7 +3348,7 @@ class NeonProxy(PgProtocol):
log.info(f"SUCCESS, found auth url: {line}")
return line
def __enter__(self) -> Self:
def __enter__(self) -> NeonProxy:
return self
def __exit__(
@@ -3449,7 +3438,7 @@ class NeonAuthBroker:
self.http_timeout_seconds = 15
self._popen: subprocess.Popen[bytes] | None = None
def start(self) -> Self:
def start(self) -> NeonAuthBroker:
assert self._popen is None
# generate key of it doesn't exist
@@ -3518,7 +3507,7 @@ class NeonAuthBroker:
request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics")
return request_result.text
def __enter__(self) -> Self:
def __enter__(self) -> NeonAuthBroker:
return self
def __exit__(
@@ -3715,7 +3704,7 @@ class Endpoint(PgProtocol, LogUtils):
config_lines: list[str] | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
) -> Self:
) -> Endpoint:
"""
Create a new Postgres endpoint.
Returns self.
@@ -3744,45 +3733,12 @@ class Endpoint(PgProtocol, LogUtils):
self.pgdata_dir = self.env.repo_dir / path
self.logfile = self.endpoint_path() / "compute.log"
config_lines = config_lines or []
# set small 'max_replication_write_lag' to enable backpressure
# and make tests more stable.
config_lines = ["max_replication_write_lag=15MB"] + config_lines
# Delete file cache if it exists (and we're recreating the endpoint)
if USE_LFC:
if (lfc_path := Path(self.lfc_path())).exists():
lfc_path.unlink()
else:
lfc_path.parent.mkdir(parents=True, exist_ok=True)
for line in config_lines:
if (
line.find("neon.max_file_cache_size") > -1
or line.find("neon.file_cache_size_limit") > -1
):
m = re.search(r"=\s*(\S+)", line)
assert m is not None, f"malformed config line {line}"
size = m.group(1)
assert size_to_bytes(size) >= size_to_bytes(
"1MB"
), "LFC size cannot be set less than 1MB"
# shared_buffers = 512kB to make postgres use LFC intensively
# neon.max_file_cache_size and neon.file_cache size limit are
# set to 1MB because small LFC is better for testing (helps to find more problems)
config_lines = [
"shared_buffers = 512kB",
f"neon.file_cache_path = '{self.lfc_path()}'",
"neon.max_file_cache_size = 1MB",
"neon.file_cache_size_limit = 1MB",
] + config_lines
else:
for line in config_lines:
assert (
line.find("neon.max_file_cache_size") == -1
), "Setting LFC parameters is not allowed when LFC is disabled"
assert (
line.find("neon.file_cache_size_limit") == -1
), "Setting LFC parameters is not allowed when LFC is disabled"
self.config(config_lines)
return self
@@ -3794,7 +3750,7 @@ class Endpoint(PgProtocol, LogUtils):
safekeepers: list[int] | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
) -> Self:
) -> Endpoint:
"""
Start the Postgres instance.
Returns self.
@@ -3816,9 +3772,6 @@ class Endpoint(PgProtocol, LogUtils):
basebackup_request_tries=basebackup_request_tries,
)
self._running.release(1)
self.log_config_value("shared_buffers")
self.log_config_value("neon.max_file_cache_size")
self.log_config_value("neon.file_cache_size_limit")
return self
@@ -3844,11 +3797,7 @@ class Endpoint(PgProtocol, LogUtils):
"""Path to the postgresql.conf in the endpoint directory (not the one in pgdata)"""
return self.endpoint_path() / "postgresql.conf"
def lfc_path(self) -> Path:
"""Path to the lfc file"""
return self.endpoint_path() / "file_cache" / "file.cache"
def config(self, lines: list[str]) -> Self:
def config(self, lines: list[str]) -> Endpoint:
"""
Add lines to postgresql.conf.
Lines should be an array of valid postgresql.conf rows.
@@ -3924,7 +3873,7 @@ class Endpoint(PgProtocol, LogUtils):
self,
mode: str = "fast",
sks_wait_walreceiver_gone: tuple[list[Safekeeper], TimelineId] | None = None,
) -> Self:
) -> Endpoint:
"""
Stop the Postgres instance if it's running.
@@ -3958,7 +3907,7 @@ class Endpoint(PgProtocol, LogUtils):
return self
def stop_and_destroy(self, mode: str = "immediate") -> Self:
def stop_and_destroy(self, mode: str = "immediate") -> Endpoint:
"""
Stop the Postgres instance, then destroy the endpoint.
Returns self.
@@ -3985,7 +3934,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id: int | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
) -> Self:
) -> Endpoint:
"""
Create an endpoint, apply config, and start Postgres.
Returns self.
@@ -4008,7 +3957,7 @@ class Endpoint(PgProtocol, LogUtils):
return self
def __enter__(self) -> Self:
def __enter__(self) -> Endpoint:
return self
def __exit__(
@@ -4026,46 +3975,16 @@ class Endpoint(PgProtocol, LogUtils):
assert self.pgdata_dir is not None # please mypy
return get_dir_size(self.pgdata_dir / "pg_wal") / 1024 / 1024
def clear_buffers(self, cursor: Any | None = None):
def clear_shared_buffers(self, cursor: Any | None = None):
"""
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
It clears LFC as well by setting neon.file_cache_size_limit to 0 and then returning it to the previous value,
if LFC is enabled
Might also clear LFC.
"""
if cursor is not None:
cursor.execute("select clear_buffer_cache()")
if not USE_LFC:
return
cursor.execute("SHOW neon.file_cache_size_limit")
res = cursor.fetchone()
assert res, "Cannot get neon.file_cache_size_limit"
file_cache_size_limit = res[0]
if file_cache_size_limit == 0:
return
cursor.execute("ALTER SYSTEM SET neon.file_cache_size_limit=0")
cursor.execute("SELECT pg_reload_conf()")
cursor.execute(f"ALTER SYSTEM SET neon.file_cache_size_limit='{file_cache_size_limit}'")
cursor.execute("SELECT pg_reload_conf()")
else:
self.safe_psql("select clear_buffer_cache()")
if not USE_LFC:
return
file_cache_size_limit = self.safe_psql_scalar(
"SHOW neon.file_cache_size_limit", log_query=False
)
if file_cache_size_limit == 0:
return
self.safe_psql("ALTER SYSTEM SET neon.file_cache_size_limit=0")
self.safe_psql("SELECT pg_reload_conf()")
self.safe_psql(f"ALTER SYSTEM SET neon.file_cache_size_limit='{file_cache_size_limit}'")
self.safe_psql("SELECT pg_reload_conf()")
def log_config_value(self, param):
"""
Writes the config value param to log
"""
res = self.safe_psql_scalar(f"SHOW {param}", log_query=False)
log.info("%s = %s", param, res)
class EndpointFactory:
@@ -4139,7 +4058,7 @@ class EndpointFactory:
pageserver_id=pageserver_id,
)
def stop_all(self, fail_on_error=True) -> Self:
def stop_all(self, fail_on_error=True) -> EndpointFactory:
exception = None
for ep in self.endpoints:
try:
@@ -4235,7 +4154,7 @@ class Safekeeper(LogUtils):
def start(
self, extra_opts: list[str] | None = None, timeout_in_seconds: int | None = None
) -> Self:
) -> Safekeeper:
if extra_opts is None:
# Apply either the extra_opts passed in, or the ones from our constructor: we do not merge the two.
extra_opts = self.extra_opts
@@ -4270,7 +4189,7 @@ class Safekeeper(LogUtils):
break # success
return self
def stop(self, immediate: bool = False) -> Self:
def stop(self, immediate: bool = False) -> Safekeeper:
self.env.neon_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self
@@ -4448,13 +4367,13 @@ class NeonBroker(LogUtils):
def start(
self,
timeout_in_seconds: int | None = None,
) -> Self:
):
assert not self.running
self.env.neon_cli.storage_broker_start(timeout_in_seconds)
self.running = True
return self
def stop(self) -> Self:
def stop(self):
if self.running:
self.env.neon_cli.storage_broker_stop()
self.running = False

View File

@@ -1,9 +1,5 @@
from __future__ import annotations
import dataclasses
import json
import random
import string
import time
from collections import defaultdict
from dataclasses import dataclass
@@ -14,14 +10,7 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.common_types import (
Id,
Lsn,
TenantId,
TenantShardId,
TimelineArchivalState,
TimelineId,
)
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
@@ -35,69 +24,6 @@ class PageserverApiException(Exception):
self.status_code = status_code
@dataclass
class ImportPgdataIdemptencyKey:
key: str
@staticmethod
def random() -> ImportPgdataIdemptencyKey:
return ImportPgdataIdemptencyKey(
"".join(random.choices(string.ascii_letters + string.digits, k=20))
)
@dataclass
class LocalFs:
path: str
@dataclass
class AwsS3:
region: str
bucket: str
key: str
@dataclass
class ImportPgdataLocation:
LocalFs: None | LocalFs = None
AwsS3: None | AwsS3 = None
@dataclass
class TimelineCreateRequestModeImportPgdata:
location: ImportPgdataLocation
idempotency_key: ImportPgdataIdemptencyKey
@dataclass
class TimelineCreateRequestMode:
Branch: None | dict[str, Any] = None
Bootstrap: None | dict[str, Any] = None
ImportPgdata: None | TimelineCreateRequestModeImportPgdata = None
@dataclass
class TimelineCreateRequest:
new_timeline_id: TimelineId
mode: TimelineCreateRequestMode
def to_json(self) -> str:
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
return super().default(o)
# mode is flattened
this = dataclasses.asdict(self)
mode = this.pop("mode")
this.update(mode)
return json.dumps(self, cls=EnhancedJSONEncoder)
class TimelineCreate406(PageserverApiException):
def __init__(self, res: requests.Response):
assert res.status_code == 406
@@ -868,9 +794,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
if compact is not None:
query["compact"] = "true" if compact else "false"
log.info(
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
)
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,

View File

@@ -66,7 +66,6 @@ def pytest_generate_tests(metafunc: Metafunc):
metafunc.parametrize("build_type", build_types)
pg_versions: list[PgVersion]
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
@@ -116,6 +115,5 @@ def pytest_runtest_makereport(*args, **kwargs):
}.get(os.uname().machine, "UNKNOWN")
arch = os.getenv("RUNNER_ARCH", uname_m)
allure.dynamic.parameter("__arch", arch)
allure.dynamic.parameter("__lfc", os.getenv("USE_LFC") != "false")
yield

View File

@@ -57,10 +57,6 @@ VERSIONS_COMBINATIONS = (
)
# fmt: on
# If the environment variable USE_LFC is set and its value is "false", then LFC is disabled for tests.
# If it is not set or set to a value not equal to "false", LFC is enabled by default.
USE_LFC = os.environ.get("USE_LFC") != "false"
def subprocess_capture(
capture_dir: Path,
@@ -657,23 +653,6 @@ def allpairs_versions():
return {"argnames": "combination", "argvalues": tuple(argvalues), "ids": ids}
def size_to_bytes(hr_size: str) -> int:
"""
Gets human-readable size from postgresql.conf (e.g. 512kB, 10MB)
returns size in bytes
"""
units = {"B": 1, "kB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4, "PB": 1024**5}
match = re.search(r"^\'?(\d+)\s*([kMGTP]?B)?\'?$", hr_size)
assert match is not None, f'"{hr_size}" is not a well-formatted human-readable size'
number, unit = match.groups()
if unit:
amp = units[unit]
else:
amp = 8192
return int(number) * amp
def skip_on_postgres(version: PgVersion, reason: str):
return pytest.mark.skipif(
PgVersion(os.getenv("DEFAULT_PG_VERSION", PgVersion.DEFAULT)) is version,
@@ -695,13 +674,6 @@ def run_only_on_default_postgres(reason: str):
)
def run_only_on_postgres(versions: Iterable[PgVersion], reason: str):
return pytest.mark.skipif(
PgVersion(os.getenv("DEFAULT_PG_VERSION", PgVersion.DEFAULT)) not in versions,
reason=reason,
)
def skip_in_debug_build(reason: str):
return pytest.mark.skipif(
os.getenv("BUILD_TYPE", "debug") == "debug",

View File

@@ -53,7 +53,7 @@ class Workload:
self._endpoint: Endpoint | None = None
self._endpoint_opts = endpoint_opts or {}
def reconfigure(self) -> None:
def reconfigure(self):
"""
Request the endpoint to reconfigure based on location reported by storage controller
"""
@@ -94,10 +94,9 @@ class Workload:
def __del__(self):
self.stop()
def init(self, pageserver_id: int | None = None, allow_recreate=False):
def init(self, pageserver_id: int | None = None):
endpoint = self.endpoint(pageserver_id)
if allow_recreate:
endpoint.safe_psql(f"DROP TABLE IF EXISTS {self.table};")
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
last_flush_lsn_upload(
@@ -193,7 +192,7 @@ class Workload:
def validate(self, pageserver_id: int | None = None):
endpoint = self.endpoint(pageserver_id)
endpoint.clear_buffers()
endpoint.clear_shared_buffers()
result = endpoint.safe_psql(f"SELECT COUNT(*) FROM {self.table}")
log.info(f"validate({self.expect_rows}): {result}")

View File

@@ -56,7 +56,7 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
def measure_recovery_time(env: NeonCompare):
client = env.env.pageserver.http_client()
pg_version = PgVersion(str(client.timeline_detail(env.tenant, env.timeline)["pg_version"]))
pg_version = PgVersion(client.timeline_detail(env.tenant, env.timeline)["pg_version"])
# Delete the Tenant in the pageserver: this will drop local and remote layers, such that
# when we "create" the Tenant again, we will replay the WAL from the beginning.

View File

@@ -5,7 +5,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -31,7 +36,7 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
# Clear the cache, so that we exercise reconstructing the pages
# from WAL
endpoint.clear_buffers()
endpoint.clear_shared_buffers()
# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.
@@ -60,7 +65,12 @@ def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -88,7 +98,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
cur.execute("delete from t")
# Clear the cache, so that we exercise reconstructing the pages
# from WAL
endpoint.clear_buffers()
endpoint.clear_shared_buffers()
# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.

View File

@@ -17,7 +17,7 @@ from fixtures.paths import BASE_DIR, COMPUTE_CONFIG_DIR
if TYPE_CHECKING:
from types import TracebackType
from typing import Self, TypedDict
from typing import TypedDict
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
@@ -185,7 +185,7 @@ class SqlExporterRunner:
def stop(self) -> None:
raise NotImplementedError()
def __enter__(self) -> Self:
def __enter__(self) -> SqlExporterRunner:
self.start()
return self
@@ -242,7 +242,8 @@ if SQL_EXPORTER is None:
self.with_volume_mapping(str(config_file), container_config_file, "z")
self.with_volume_mapping(str(collector_file), container_collector_file, "z")
def start(self) -> Self:
@override
def start(self) -> SqlExporterContainer:
super().start()
log.info("Waiting for sql_exporter to be ready")

View File

@@ -13,7 +13,7 @@ from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
if TYPE_CHECKING:
from typing import Any, Self
from typing import Any
def handle_db(dbs, roles, operation):
@@ -91,7 +91,7 @@ class DdlForwardingContext:
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)
def __enter__(self) -> Self:
def __enter__(self):
self.pg.start()
return self

View File

@@ -2,13 +2,10 @@ from __future__ import annotations
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_explain_with_lfc_stats(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -19,6 +16,8 @@ def test_explain_with_lfc_stats(neon_simple_env: NeonEnv):
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='128MB'",
"neon.file_cache_size_limit='64MB'",
],

View File

@@ -170,7 +170,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
# re-execute the query, it will make GetPage
# requests. This does not clear the last-written LSN cache
# so we still remember the LSNs of the pages.
secondary.clear_buffers(cursor=s_cur)
secondary.clear_shared_buffers(cursor=s_cur)
if pause_apply:
s_cur.execute("SELECT pg_wal_replay_pause()")

View File

@@ -1,307 +0,0 @@
import json
import re
import time
from enum import Enum
import psycopg2
import psycopg2.errors
import pytest
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, VanillaPostgres
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import run_only_on_postgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
num_rows = 1000
class RelBlockSize(Enum):
ONE_STRIPE_SIZE = 1
TWO_STRPES_PER_SHARD = 2
MULTIPLE_RELATION_SEGMENTS = 3
smoke_params = [
# unsharded (the stripe size needs to be given for rel block size calculations)
*[(None, 1024, s) for s in RelBlockSize],
# many shards, small stripe size to speed up test
*[(8, 1024, s) for s in RelBlockSize],
]
@run_only_on_postgres(
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
"newer control file catalog version and struct format isn't supported",
)
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres,
neon_env_builder: NeonEnvBuilder,
shard_count: int | None,
stripe_size: int,
rel_block_size: RelBlockSize,
make_httpserver: HTTPServer,
):
#
# Setup fake control plane for import progress
#
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
}
)
env.pageserver.stop()
env.pageserver.start()
#
# Put data in vanilla pg
#
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
log.info("create relblock data")
if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE:
target_relblock_size = stripe_size * 8192
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
else:
raise ValueError
# fillfactor so we don't need to produce that much data
# 900 byte per row is > 10% => 1 row per page
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages"
)
if relblock_size >= target_relblock_size:
break
addrows = int((target_relblock_size - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})")
nrows += addrows
expect_nrows = nrows
expect_sum = (
(nrows) * (nrows + 1) // 2
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
def validate_vanilla_equivalence(ep):
# TODO: would be nicer to just compare pgdump
assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [
(expect_nrows, expect_sum)
]
validate_vanilla_equivalence(vanilla_pg)
vanilla_pg.stop()
#
# We have a Postgres data directory now.
# Make a localfs remote storage that looks like how after `fast_import` ran.
# TODO: actually exercise fast_import here
# TODO: test s3 remote storage
#
importbucket = neon_env_builder.repo_dir / "importbucket"
importbucket.mkdir()
# what cplane writes before scheduling fast_import
specpath = importbucket / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(importbucket / "pgdata")
statusdir = importbucket / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
#
# Do the import
#
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id, shard_count=shard_count, shard_stripe_size=stripe_size
)
timeline_id = TimelineId.generate()
log.info("starting import")
start = time.monotonic()
idempotency = ImportPgdataIdemptencyKey.random()
log.info(f"idempotency key {idempotency}")
# TODO: teach neon_local CLI about the idempotency & 429 error so we can run inside the loop
# and check for 429
import_branch_name = "imported"
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket.absolute())}},
},
},
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
shard_status_file = statusdir / f"shard-{shard_id.shard_index}"
if state == "Active":
shard_status_file_contents = (
shard_status_file.read_text()
) # Active state implies import is done
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(1)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")
#
# Get some timeline details for later.
#
locations = env.storage_controller.locate(tenant_id)
[shard_zero] = [
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
]
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
shard_zero_http = shard_zero_ps.http_client()
shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id)
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"])
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
assert remote_consistent_lsn_visible == disk_consistent_lsn
assert initdb_lsn == latest_gc_cutoff_lsn
assert disk_consistent_lsn == initdb_lsn + 8
assert last_record_lsn == disk_consistent_lsn
# TODO: assert these values are the same everywhere
#
# Validate the resulting remote storage state.
#
#
# Validate the imported data
#
ro_endpoint = env.endpoints.create_start(
branch_name=import_branch_name, endpoint_id="ro", tenant_id=tenant_id, lsn=last_record_lsn
)
validate_vanilla_equivalence(ro_endpoint)
# ensure the import survives restarts
ro_endpoint.stop()
env.pageserver.stop(immediate=True)
env.pageserver.start()
ro_endpoint.start()
validate_vanilla_equivalence(ro_endpoint)
#
# validate the layer files in each shard only have the shard-specific data
# (the implementation would be functional but not efficient without this characteristic)
#
shards = env.storage_controller.locate(tenant_id)
for shard in shards:
shard_ps = env.get_pageserver(shard["node_id"])
result = shard_ps.timeline_scan_no_disposable_keys(shard["shard_id"], timeline_id)
assert result.tally.disposable_count == 0
assert (
result.tally.not_disposable_count > 0
), "sanity check, each shard should have some data"
#
# validate that we can write
#
rw_endpoint = env.endpoints.create_start(
branch_name=import_branch_name, endpoint_id="rw", tenant_id=tenant_id
)
rw_endpoint.safe_psql("create table othertable(values text)")
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
# TODO: consider using `class Workload` here
# to do compaction and whatnot?
#
# validate that we can branch (important use case)
#
# ... at the tip
_ = env.create_branch(
new_branch_name="br-tip",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=rw_lsn,
)
br_tip_endpoint = env.endpoints.create_start(
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id
)
validate_vanilla_equivalence(br_tip_endpoint)
br_tip_endpoint.safe_psql("select * from othertable")
# ... at the initdb lsn
_ = env.create_branch(
new_branch_name="br-initdb",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=initdb_lsn,
)
br_initdb_endpoint = env.endpoints.create_start(
branch_name="br-initdb", endpoint_id="br-initdb-ro", tenant_id=tenant_id
)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql("select * from othertable")

View File

@@ -99,15 +99,11 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
res = client.metrics()
info("Metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
@@ -120,7 +116,7 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
try:
res = client.metrics()
timeout = -1
if len(parse_metrics(res).query_all("compute_installed_extensions")) < 4:
if len(parse_metrics(res).query_all("installed_extensions")) < 4:
# Assume that not all metrics that are collected yet
time.sleep(1)
timeout -= 1
@@ -132,21 +128,17 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
continue
assert (
len(parse_metrics(res).query_all("compute_installed_extensions")) >= 4
len(parse_metrics(res).query_all("installed_extensions")) >= 4
), "Not all metrics are collected"
info("After restart metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1

View File

@@ -271,9 +271,3 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str):
# Ensure no weird errors in the end...
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
if attach_mode == "same_generation":
# we should have detected a race upload and deferred it
env.pageserver.assert_log_contains(
"waiting for deletion queue flush to complete before uploading layer"
)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import os
import random
import re
import subprocess
@@ -9,24 +10,20 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import USE_LFC
@pytest.mark.timeout(600)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
"""
Test resizing the Local File Cache
"""
env = neon_simple_env
cache_dir = env.repo_dir / "file_cache"
cache_dir.mkdir(exist_ok=True)
env.create_branch("test_lfc_resize")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
n_resize = 10
@@ -66,8 +63,8 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
cur.execute("select pg_reload_conf()")
nretries = 10
while True:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
)

View File

@@ -3,13 +3,11 @@ from __future__ import annotations
import time
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC, query_scalar
from fixtures.utils import query_scalar
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -20,6 +18,8 @@ def test_lfc_working_set_approximation(neon_simple_env: NeonEnv):
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='128MB'",
"neon.file_cache_size_limit='64MB'",
],
@@ -72,10 +72,9 @@ WITH (fillfactor='100');
# verify working set size after some index access of a few select pages only
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
log.info(f"working set size after some index access of a few select pages only {blocks}")
assert blocks < 12
assert blocks < 10
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -6,12 +6,10 @@ import random
import threading
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import USE_LFC, query_scalar
from fixtures.utils import query_scalar
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
@@ -21,6 +19,8 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='64MB'",
"neon.file_cache_size_limit='10MB'",
],

View File

@@ -12,7 +12,7 @@ from fixtures.neon_fixtures import (
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.utils import USE_LFC, wait_until
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import (
@@ -576,15 +576,7 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van
# We want all data to fit into shared_buffers because later we stop
# safekeeper and insert more; this shouldn't cause page requests as they
# will be stuck.
sub = env.endpoints.create(
"subscriber",
config_lines=[
"neon.max_file_cache_size = 32MB",
"neon.file_cache_size_limit = 32MB",
]
if USE_LFC
else [],
)
sub = env.endpoints.create("subscriber", config_lines=["shared_buffers=128MB"])
sub.start()
with vanilla_pg.cursor() as pcur:

View File

@@ -39,7 +39,7 @@ def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
oid = cur.fetchall()[0][0]
log.info(f"t2.relfilenode={oid}")
endpoint.clear_buffers(cursor=cur)
endpoint.clear_shared_buffers(cursor=cur)
cur.execute("SELECT x from t1")
assert cur.fetchone() == (1,)

View File

@@ -131,7 +131,7 @@ def test_pageserver_small_inmemory_layers(
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint: we should see dirty data.
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env))
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
ps_http_client = env.pageserver.http_client()
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
@@ -139,7 +139,7 @@ def test_pageserver_small_inmemory_layers(
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0))
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
# must be uploaded to remain visible to the pageserver after restart.
@@ -180,7 +180,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder):
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint: we should see dirty data.
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env))
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
# Stop the safekeepers, so that we cannot have any more WAL receiver connections
for sk in env.safekeepers:
@@ -193,7 +193,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder):
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0))
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore
# The code below verifies that we do not flush on the first write
# after an idle period longer than the checkpoint timeout.
@@ -210,7 +210,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder):
run_worker_for_tenant(env, 5, tenant_with_extra_writes, offset=ENTRIES_PER_TIMELINE)
)
dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env))
dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
# We shouldn't flush since we've just opened a new layer
waited_for = 0
@@ -312,4 +312,4 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder):
dirty_bytes = get_dirty_bytes(env)
assert dirty_bytes < max_dirty_data
wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited())
wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited()) # type: ignore

View File

@@ -702,7 +702,7 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
else:
timeout = int(deadline - now) + 1
try:
wait_until(timeout, 1, lambda: pageserver.assert_log_contains(expression))
wait_until(timeout, 1, lambda: pageserver.assert_log_contains(expression)) # type: ignore
except:
log.error(f"Timed out waiting for '{expression}'")
raise

View File

@@ -54,7 +54,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Clear buffer cache to ensure no stale pages are brought into the cache")
endpoint.clear_buffers(cursor=c)
endpoint.clear_shared_buffers(cursor=c)
cache_entries = query_scalar(
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"

Some files were not shown because too many files have changed in this diff Show More