Compare commits

..

35 Commits

Author SHA1 Message Date
Stas Kelvich
2bf9a350ef Process chunks in parallel 2024-09-14 14:07:35 +01:00
Heikki Linnakangas
2e7e5f4f3a Refactor how the image layer partitioning is done
It got pretty ugly after the last commit. Refactor it so that we first
collect all the key ranges that need to be written out into a list of
tasks, then partition the tasks into image layers, and then write them
out. This will be much easier to parallelize, but that's not included
in this commit yet.
2024-09-13 05:41:35 +03:00
Heikki Linnakangas
9f3d5826be Fix with files > 4 GB 2024-09-13 02:27:26 +03:00
Heikki Linnakangas
4f39501641 Fix handling relations > 1 GB 2024-09-13 02:04:48 +03:00
Heikki Linnakangas
67d1606f82 Write out multiple image layers 2024-09-13 01:47:10 +03:00
Heikki Linnakangas
9351ba26ff Fake LSN
Test passes, yay!
2024-09-12 21:44:49 +03:00
Stas Kelvich
8df388330b Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 19:26:16 +01:00
Stas Kelvich
357c07dd35 track rel file import time 2024-09-12 19:26:03 +01:00
Heikki Linnakangas
7b90ec6e19 Create controlfile and checkpoint entries
XXX: untested, not sure if it works..
2024-09-12 21:01:04 +03:00
Heikki Linnakangas
85f4e966e8 Import dummy pg_twophase dir entry 2024-09-12 20:54:16 +03:00
Heikki Linnakangas
4d27048d6d Import SLRUs 2024-09-12 20:46:20 +03:00
Stas Kelvich
3a452d8f56 remove old timeline init code 2024-09-12 18:20:13 +01:00
Stas Kelvich
b81dbc887b import relation sizes 2024-09-12 18:19:25 +01:00
Stas Kelvich
80fed9cfb1 fix oder of insertion for relmaps and reldirs 2024-09-12 15:43:54 +01:00
Stas Kelvich
189386b22f Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 13:52:11 +01:00
Stas Kelvich
38dfecb026 clean imports 2024-09-12 13:51:48 +01:00
Stas Kelvich
be28bd8312 merge 2024-09-12 13:49:34 +01:00
Heikki Linnakangas
9759d6ec72 Rename the image layer to not have the temp suffix 2024-09-12 15:49:21 +03:00
Stas Kelvich
0c64d55a6b Import dbdir, relmaps, reldirs 2024-09-12 13:48:29 +01:00
Heikki Linnakangas
578da1dc02 Parse postgres version from control file 2024-09-12 15:21:59 +03:00
Stas Kelvich
842ac7cfda resolve conflicts 2024-09-12 13:13:16 +01:00
Stas Kelvich
71340e3c00 common iterators for pg data dirs 2024-09-12 13:10:35 +01:00
Heikki Linnakangas
e6e0b27dc3 Create index_part.json 2024-09-12 14:53:29 +03:00
Heikki Linnakangas
04ec8bd7de test: Attach the tenant, start endpoint on it
Doesn't work yet, I think because index_part.json is missing
2024-09-12 13:52:14 +03:00
Heikki Linnakangas
6563be1a4c Test passes now
It runs the command successfully. Doesn't try to attach it to the
pageserver on it yet

    BUILD_TYPE=debug DEFAULT_PG_VERSION=16 poetry run pytest --preserve-database-files test_runner/regress/test_pg_import.py
2024-09-12 13:36:42 +03:00
Heikki Linnakangas
fe975acc71 Add --tenant-id and --timeline-id options 2024-09-12 13:28:12 +03:00
Heikki Linnakangas
abed35589b Test fix 2024-09-12 12:59:45 +03:00
Stas Kelvich
3fe8b69968 Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 10:59:24 +01:00
Stas Kelvich
0c856443c4 now it produces an image layer 2024-09-12 10:57:50 +01:00
Heikki Linnakangas
0fc584ef9a Add python test 2024-09-12 12:43:12 +03:00
Stas Kelvich
daedec65ac fix awaits 2024-09-12 10:42:08 +01:00
Stas Kelvich
94c393bf8f resolve conflicts 2024-09-12 10:37:07 +01:00
Stas Kelvich
28616b0907 compiles 2024-09-12 10:33:14 +01:00
Heikki Linnakangas
241724f3fc CLI args parsing 2024-09-12 12:31:07 +03:00
Stas Kelvich
98d128d993 first sketch 2024-09-12 09:59:36 +01:00
212 changed files with 3995 additions and 4567 deletions

View File

@@ -1 +0,0 @@
FROM neondatabase/build-tools:pinned

View File

@@ -1,23 +0,0 @@
// https://containers.dev/implementors/json_reference/
{
"name": "Neon",
"build": {
"context": "..",
"dockerfile": "Dockerfile.devcontainer"
},
"postCreateCommand": {
"build neon": "BUILD_TYPE=debug CARGO_BUILD_FLAGS='--features=testing' mold -run make -s -j`nproc`",
"install python deps": "./scripts/pysync"
},
"customizations": {
"vscode": {
"extensions": [
"charliermarsh.ruff",
"github.vscode-github-actions",
"rust-lang.rust-analyzer"
]
}
}
}

View File

@@ -7,13 +7,6 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
- AZURE_PROD_CLIENT_ID
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER

View File

@@ -1,56 +0,0 @@
name: Push images to ACR
on:
workflow_call:
inputs:
client_id:
description: Client ID of Azure managed identity or Entra app
required: true
type: string
image_tag:
description: Tag for the container image
required: true
type: string
images:
description: Images to push
required: true
type: string
registry_name:
description: Name of the container registry
required: true
type: string
subscription_id:
description: Azure subscription ID
required: true
type: string
tenant_id:
description: Azure tenant ID
required: true
type: string
jobs:
push-to-acr:
runs-on: ubuntu-22.04
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
steps:
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ inputs.client_id }}
subscription-id: ${{ inputs.subscription_id }}
tenant-id: ${{ inputs.tenant_id }}
- name: Login to ACR
run: |
az acr login --name=${{ inputs.registry_name }}
- name: Copy docker images to ACR ${{ inputs.registry_name }}
run: |
images='${{ inputs.images }}'
for image in ${images}; do
docker buildx imagetools create \
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
neondatabase/${image}:${{ inputs.image_tag }}
done

View File

@@ -286,7 +286,6 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
SYNC_AFTER_EACH_TEST: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
@@ -794,6 +793,9 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml down
promote-images:
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
@@ -820,6 +822,28 @@ jobs:
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
- name: Azure login
if: github.ref_name == 'main'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'
run: |
az acr login --name=neoneastus2
- name: Copy docker images to ACR-dev
if: github.ref_name == 'main'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create \
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
done
- name: Add latest tag to images
if: github.ref_name == 'main'
run: |
@@ -857,30 +881,6 @@ jobs:
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
push-to-acr-dev:
if: github.ref_name == 'main'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
push-to-acr-prod:
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, tag ]
runs-on: ubuntu-22.04
@@ -956,8 +956,8 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -7,11 +7,6 @@ on:
pull_request_target:
types:
- opened
workflow_dispatch:
inputs:
github-actor:
description: 'GitHub username. If empty, the username of the current user will be used'
required: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -31,31 +26,12 @@ jobs:
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
ACTOR: ${{ inputs.github-actor || github.actor }}
run: |
expected_error="User does not exist or is not a member of the organization"
output_file=output.txt
for i in $(seq 1 10); do
if gh api "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${ACTOR}" \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" > ${output_file}; then
is_member=true
break
elif grep -q "${expected_error}" ${output_file}; then
is_member=false
break
elif [ $i -eq 10 ]; then
title="Failed to get memmbership status for ${ACTOR}"
message="The latest GitHub API error message: '$(cat ${output_file})'"
echo "::error file=.github/workflows/label-for-external-users.yml,title=${title}::${message}"
exit 1
fi
sleep 1
done
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
else
is_member=false
fi
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}

163
Cargo.lock generated
View File

@@ -915,22 +915,25 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.70.1"
version = "0.65.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
dependencies = [
"bitflags 2.4.1",
"bitflags 1.3.2",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
"prettyplease 0.2.17",
"peeking_take_while",
"prettyplease 0.2.6",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.52",
"which",
]
[[package]]
@@ -1189,9 +1192,9 @@ dependencies = [
[[package]]
name = "comfy-table"
version = "7.1.1"
version = "6.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d"
dependencies = [
"crossterm",
"strum",
@@ -1246,7 +1249,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -1360,8 +1363,8 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util",
"toml",
"toml_edit",
"toml 0.7.4",
"toml_edit 0.19.10",
"tracing",
"url",
"utils",
@@ -1485,22 +1488,25 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crossterm"
version = "0.27.0"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
dependencies = [
"bitflags 2.4.1",
"bitflags 1.3.2",
"crossterm_winapi",
"libc",
"mio",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
]
[[package]]
name = "crossterm_winapi"
version = "0.9.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
dependencies = [
"winapi",
]
@@ -2721,12 +2727,6 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "indoc"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
[[package]]
name = "infer"
version = "0.2.3"
@@ -2943,6 +2943,12 @@ dependencies = [
"spin 0.5.2",
]
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.150"
@@ -3141,7 +3147,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff"
dependencies = [
"serde",
"toml",
"toml 0.8.14",
]
[[package]]
@@ -3657,7 +3663,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"utils",
"workspace_hack",
]
@@ -3695,7 +3701,6 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.26",
"indoc",
"itertools 0.10.5",
"md5",
"metrics",
@@ -3744,7 +3749,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"twox-hash",
"url",
@@ -3761,7 +3766,6 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"camino",
"chrono",
"const_format",
"enum-map",
@@ -3769,16 +3773,11 @@ dependencies = [
"humantime",
"humantime-serde",
"itertools 0.10.5",
"nix 0.27.1",
"postgres_backend",
"postgres_ffi",
"rand 0.8.5",
"remote_storage",
"reqwest 0.12.4",
"serde",
"serde_json",
"serde_with",
"storage_broker",
"strum",
"strum_macros",
"thiserror",
@@ -3907,9 +3906,8 @@ dependencies = [
[[package]]
name = "parquet"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
dependencies = [
"ahash",
"bytes",
@@ -3928,9 +3926,8 @@ dependencies = [
[[package]]
name = "parquet_derive"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86e9fcfae007533a06b580429a3f7e07cb833ec8aa37c041c16563e7918f057e"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
dependencies = [
"parquet",
"proc-macro2",
@@ -3967,6 +3964,12 @@ dependencies = [
"sha2",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "3.0.3"
@@ -4120,7 +4123,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4133,7 +4136,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4152,7 +4155,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4264,9 +4267,9 @@ dependencies = [
[[package]]
name = "prettyplease"
version = "0.2.17"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
dependencies = [
"proc-macro2",
"syn 2.0.52",
@@ -4811,7 +4814,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"utils",
]
@@ -5321,7 +5324,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-subscriber",
"url",
@@ -5730,6 +5733,17 @@ dependencies = [
"signal-hook-registry",
]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@@ -6042,21 +6056,21 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.26.3"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
[[package]]
name = "strum_macros"
version = "0.26.4"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck 0.5.0",
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.52",
"syn 1.0.109",
]
[[package]]
@@ -6067,9 +6081,8 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
version = "0.4.2"
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
[[package]]
name = "syn"
@@ -6397,7 +6410,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"async-trait",
"byteorder",
@@ -6508,6 +6521,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "toml"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.19.10",
]
[[package]]
name = "toml"
version = "0.8.14"
@@ -6517,7 +6542,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
"toml_edit 0.22.14",
]
[[package]]
@@ -6529,6 +6554,19 @@ dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
dependencies = [
"indexmap 1.9.3",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.4.6",
]
[[package]]
name = "toml_edit"
version = "0.22.14"
@@ -6539,7 +6577,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.6.13",
]
[[package]]
@@ -6952,7 +6990,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-error",
"tracing-subscriber",
@@ -7498,6 +7536,15 @@ version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
[[package]]
name = "winnow"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.13"
@@ -7567,7 +7614,6 @@ dependencies = [
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"libc",
"log",
@@ -7605,7 +7651,6 @@ dependencies = [
"tokio",
"tokio-rustls 0.24.0",
"tokio-util",
"toml_edit",
"tonic",
"tower",
"tracing",

View File

@@ -64,7 +64,7 @@ aws-types = "1.2.0"
axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.70"
bindgen = "0.65"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
@@ -73,7 +73,7 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
comfy-table = "7.1"
comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
@@ -103,7 +103,6 @@ humantime-serde = "1.1.1"
hyper = "0.14"
tokio-tungstenite = "0.20.0"
indexmap = "2"
indoc = "2"
inotify = "0.10.2"
ipnet = "2.9.0"
itertools = "0.10"
@@ -123,8 +122,8 @@ opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.16"
@@ -158,10 +157,11 @@ signal-hook = "0.3"
smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
strum = "0.26"
strum_macros = "0.26"
strum = "0.24"
strum_macros = "0.24"
"subtle" = "2.5.0"
svg_fmt = "0.4.3"
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"
@@ -177,8 +177,8 @@ tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
@@ -201,21 +201,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
# We want to use the 'neon' branch for these, but there's currently one
# incompatible change on the branch. See:
#
# - PR #8076 which contained changes that depended on the new changes in
# the rust-postgres crate, and
# - PR #8654 which reverted those changes and made the code in proxy incompatible
# with the tip of the 'neon' branch again.
#
# When those proxy changes are re-applied (see PR #8747), we can switch using
# the tip of the 'neon' branch again.
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -252,7 +241,11 @@ tonic-build = "0.9"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# bug fixes for UUID
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
################# Binary contents sections

View File

@@ -87,7 +87,6 @@ RUN mkdir -p /data/.neon/ && \
"pg_distrib_dir='/usr/local/'\n" \
"listen_pg_addr='0.0.0.0:6400'\n" \
"listen_http_addr='0.0.0.0:9898'\n" \
"availability_zone='local'\n" \
> /data/.neon/pageserver.toml && \
chown -R neon:neon /data/.neon

View File

@@ -192,7 +192,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.81.0
ENV RUSTC_VERSION=1.80.1
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
@@ -207,7 +207,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
export PATH="$HOME/.cargo/bin:$PATH" && \
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
rustup component add llvm-tools-preview rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \

View File

@@ -64,12 +64,6 @@ brew install protobuf openssl flex bison icu4c pkg-config
echo 'export PATH="$(brew --prefix openssl)/bin:$PATH"' >> ~/.zshrc
```
If you get errors about missing `m4` you may have to install it manually:
```
brew install m4
brew link --force m4
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
```
# recommended approach from https://www.rust-lang.org/tools/install

View File

@@ -22,10 +22,9 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal.
///
/// Wrapping the result with `E'{}'` or `'{}'` is not required,
/// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// Escape a string for including it in a SQL literal. Wrapping the result
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
/// for the original implementation.
pub fn escape_literal(s: &str) -> String {

View File

@@ -640,8 +640,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id =
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
let new_branch_name = branch_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
@@ -660,6 +658,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
@@ -1571,6 +1570,7 @@ fn cli() -> Command {
.value_parser(value_parser!(PathBuf))
.value_name("config")
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
@@ -1583,7 +1583,6 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

View File

@@ -165,9 +165,6 @@ pub struct NeonStorageControllerConf {
pub split_threshold: Option<u64>,
pub max_secondary_lag_bytes: Option<u64>,
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
}
impl NeonStorageControllerConf {
@@ -175,9 +172,6 @@ impl NeonStorageControllerConf {
const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
// Very tight heartbeat interval to speed up tests
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
}
impl Default for NeonStorageControllerConf {
@@ -189,7 +183,6 @@ impl Default for NeonStorageControllerConf {
database_url: None,
split_threshold: None,
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
}
}
}

View File

@@ -75,14 +75,14 @@ impl PageServerNode {
}
}
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::DocumentMut {
toml_edit::DocumentMut::from_str(&format!("id={node_id}")).unwrap()
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::Document {
toml_edit::Document::from_str(&format!("id={node_id}")).unwrap()
}
fn pageserver_init_make_toml(
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
) -> anyhow::Result<toml_edit::Document> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)
@@ -137,9 +137,9 @@ impl PageServerNode {
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
let mut config_toml = toml_edit::DocumentMut::new();
let mut config_toml = toml_edit::Document::new();
for fragment_str in overrides {
let fragment = toml_edit::DocumentMut::from_str(&fragment_str)
let fragment = toml_edit::Document::from_str(&fragment_str)
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
for (key, item) in fragment.iter() {
config_toml.insert(key, item.clone());
@@ -181,23 +181,6 @@ impl PageServerNode {
);
io::stdout().flush()?;
// If the config file we got as a CLI argument includes the `availability_zone`
// config, then use that to populate the `metadata.json` file for the pageserver.
// In production the deployment orchestrator does this for us.
let az_id = conf
.other
.get("availability_zone")
.map(|toml| {
let az_str = toml.to_string();
// Trim the (") chars from the toml representation
if az_str.starts_with('"') && az_str.ends_with('"') {
az_str[1..az_str.len() - 1].to_string()
} else {
az_str
}
})
.unwrap_or("local".to_string());
let config = self
.pageserver_init_make_toml(conf)
.context("make pageserver toml")?;
@@ -233,7 +216,6 @@ impl PageServerNode {
let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -244,10 +226,7 @@ impl PageServerNode {
postgres_port: self.pg_connection_config.port(),
http_host: "localhost".to_string(),
http_port,
other: HashMap::from([(
"availability_zone_id".to_string(),
serde_json::json!(az_id),
)]),
other: HashMap::new(),
})
.unwrap(),
)

View File

@@ -437,8 +437,6 @@ impl StorageController {
&humantime::Duration::from(self.config.max_offline).to_string(),
"--max-warming-up-interval",
&humantime::Duration::from(self.config.max_warming_up).to_string(),
"--heartbeat-interval",
&humantime::Duration::from(self.config.heartbeat_interval).to_string(),
"--address-for-peers",
&address_for_peers.to_string(),
]

View File

@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -80,10 +80,7 @@ enum Command {
/// List nodes known to the storage controller
Nodes {},
/// List tenants known to the storage controller
Tenants {
/// If this field is set, it will list the tenants on a specific node
node_id: Option<NodeId>,
},
Tenants {},
/// Create a new tenant in the storage controller, and by extension on pageservers.
TenantCreate {
#[arg(long)]
@@ -339,7 +336,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id,
availability_zone_id: Some(availability_zone_id),
}),
)
.await?;
@@ -406,41 +403,7 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::Tenants {
node_id: Some(node_id),
} => {
let describe_response = storcon_client
.dispatch::<(), NodeShardResponse>(
Method::GET,
format!("control/v1/node/{node_id}/shards"),
None,
)
.await?;
let shards = describe_response.shards;
let mut table = comfy_table::Table::new();
table.set_header([
"Shard",
"Intended Primary/Secondary",
"Observed Primary/Secondary",
]);
for shard in shards {
table.add_row([
format!("{}", shard.tenant_shard_id),
match shard.is_intended_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
match shard.is_observed_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
]);
}
println!("{table}");
}
Command::Tenants { node_id: None } => {
Command::Tenants {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,

View File

@@ -68,7 +68,6 @@ macro_rules! register_uint_gauge {
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
///
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> prometheus::Result<()> {

View File

@@ -4,10 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
# See pageserver/Cargo.toml
testing = ["dep:nix"]
[dependencies]
serde.workspace = true
serde_with.workspace = true
@@ -27,12 +23,6 @@ thiserror.workspace = true
humantime-serde.workspace = true
chrono = { workspace = true, features = ["serde"] }
itertools.workspace = true
storage_broker.workspace = true
camino = {workspace = true, features = ["serde1"]}
remote_storage.workspace = true
postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -1,28 +1,15 @@
use camino::Utf8PathBuf;
use std::collections::HashMap;
use const_format::formatcp;
#[cfg(test)]
mod tests;
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
use postgres_backend::AuthType;
use remote_storage::RemoteStorageConfig;
use serde_with::serde_as;
use std::{
collections::HashMap,
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
time::Duration,
};
use utils::logging::LogFormat;
use crate::models::ImageCompressionAlgorithm;
use crate::models::LsnLease;
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// itself, it is only used for registering the pageserver with the control
@@ -42,476 +29,3 @@ pub struct NodeMetadata {
#[serde(flatten)]
pub other: HashMap<String, serde_json::Value>,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
/// that fills in the default values for each config field.
///
/// If there cannot be a static default value because we need to make runtime
/// checks to determine the default, make it an `Option` (which defaults to None).
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
#[serde_as]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ConfigToml {
// types mapped 1:1 into the runtime PageServerConfig type
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub availability_zone: Option<String>,
#[serde(with = "humantime_serde")]
pub wait_lsn_timeout: Duration,
#[serde(with = "humantime_serde")]
pub wal_redo_timeout: Duration,
pub superuser: String,
pub page_cache_size: usize,
pub max_file_descriptors: usize,
pub pg_distrib_dir: Option<Utf8PathBuf>,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub http_auth_type: AuthType,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub pg_auth_type: AuthType,
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
pub remote_storage: Option<RemoteStorageConfig>,
pub tenant_config: TenantConfigToml,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub broker_endpoint: storage_broker::Uri,
#[serde(with = "humantime_serde")]
pub broker_keepalive_interval: Duration,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub log_format: LogFormat,
pub concurrent_tenant_warmup: NonZeroUsize,
pub concurrent_tenant_size_logical_size_queries: NonZeroUsize,
#[serde(with = "humantime_serde")]
pub metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<reqwest::Url>,
pub metric_collection_bucket: Option<RemoteStorageConfig>,
#[serde(with = "humantime_serde")]
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
pub control_plane_api: Option<reqwest::Url>,
pub control_plane_api_token: Option<String>,
pub control_plane_emergency_mode: bool,
pub heatmap_upload_concurrency: usize,
pub secondary_download_concurrency: usize,
pub virtual_file_io_engine: Option<crate::models::virtual_file::IoEngineKind>,
pub ingest_batch_size: u64,
pub max_vectored_read_bytes: MaxVectoredReadBytes,
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
#[serde(skip_serializing)]
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
#[serde(with = "humantime_serde")]
pub period: Duration,
#[cfg(feature = "testing")]
pub mock_statvfs: Option<statvfs::mock::Behavior>,
/// Select sorting for evicted layers
#[serde(default)]
pub eviction_order: EvictionOrder,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum Behavior {
Success {
blocksize: u64,
total_blocks: u64,
name_filter: Option<utils::serde_regex::Regex>,
},
#[cfg(feature = "testing")]
Failure { mocked_error: MockedError },
}
#[cfg(feature = "testing")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[allow(clippy::upper_case_acronyms)]
pub enum MockedError {
EIO,
}
#[cfg(feature = "testing")]
impl From<MockedError> for nix::Error {
fn from(e: MockedError) -> Self {
match e {
MockedError::EIO => nix::Error::EIO,
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "args")]
pub enum EvictionOrder {
RelativeAccessed {
highest_layer_count_loses_first: bool,
},
}
impl Default for EvictionOrder {
fn default() -> Self {
Self::RelativeAccessed {
highest_layer_count_loses_first: true,
}
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct TenantConfigToml {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Inmemory layer is also flushed at least once in checkpoint_timeout to
// eventually upload WAL after activity is stopped.
#[serde(with = "humantime_serde")]
pub checkpoint_timeout: Duration,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
// Duration::ZERO means automatic compaction is disabled.
#[serde(with = "humantime_serde")]
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
// Page versions older than this are garbage collected away.
pub gc_horizon: u64,
// Interval at which garbage collection is triggered.
// Duration::ZERO means automatic GC is disabled
#[serde(with = "humantime_serde")]
pub gc_period: Duration,
// Delta layer churn threshold to create L1 image layers.
pub image_creation_threshold: usize,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is time.
// Page versions older than this are garbage collected away.
#[serde(with = "humantime_serde")]
pub pitr_interval: Duration,
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Duration,
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
/// A stalled safekeeper will be changed to a newer one when it appears.
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Duration,
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub eviction_policy: crate::models::EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
/// may be disabled if a Tenant will not have secondary locations: only secondary
/// locations will use the heatmap uploaded by attached locations.
#[serde(with = "humantime_serde")]
pub heatmap_period: Duration,
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
pub lazy_slru_download: bool,
pub timeline_get_throttle: crate::models::ThrottleConfig,
// How much WAL must be ingested before checking again whether a new image layer is required.
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: crate::models::AuxFilePolicy,
/// The length for an explicit LSN lease request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Duration,
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Duration,
}
pub mod defaults {
use crate::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_LOG_FORMAT: &str = "plain";
pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = 1;
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Zstd { level: Some(1) };
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
}
impl Default for ConfigToml {
fn default() -> Self {
use defaults::*;
Self {
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
availability_zone: (None),
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
.expect("cannot parse default wal redo timeout")),
superuser: (DEFAULT_SUPERUSER.to_string()),
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
http_auth_type: (AuthType::Trust),
pg_auth_type: (AuthType::Trust),
auth_validation_public_key_path: (None),
remote_storage: None,
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint")),
broker_keepalive_interval: (humantime::parse_duration(
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
)
.expect("cannot parse default keepalive interval")),
log_format: (LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
.expect("Invalid default constant")),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
metric_collection_interval: (humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default metric collection interval")),
synthetic_size_calculation_interval: (humantime::parse_duration(
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
)
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: (DEFAULT_METRIC_COLLECTION_ENDPOINT),
metric_collection_bucket: (None),
disk_usage_based_eviction: (None),
test_remote_failures: (0),
ondemand_download_behavior_treat_error_as_warn: (false),
background_task_maximum_delay: (humantime::parse_duration(
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
control_plane_api: (None),
control_plane_api_token: (None),
control_plane_emergency_mode: (false),
heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
ingest_batch_size: (DEFAULT_INGEST_BATCH_SIZE),
virtual_file_io_engine: None,
max_vectored_read_bytes: (MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
compact_level0_phase1_value_access: Default::default(),
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
tenant_config: TenantConfigToml::default(),
}
}
}
pub mod tenant_conf_defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
// FIXME the below configs are only used by legacy algorithm. The new algorithm
// has different parameters.
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
// If there's a need to decrease this value, first make sure that GC
// doesn't hold a layer map write lock for non-trivial operations.
// Relevant: https://github.com/neondatabase/neon/issues/3394
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
// throughputs up to 1GiB/s per timeline.
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
impl Default for TenantConfigToml {
fn default() -> Self {
use tenant_conf_defaults::*;
Self {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
.expect("cannot parse default checkpoint timeout"),
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
},
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.expect("cannot parse default walreceiver connect timeout"),
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
eviction_policy: crate::models::EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::models::ThrottleConfig::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: crate::models::AuxFilePolicy::default_tenant_config(),
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
}
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -57,7 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub availability_zone_id: String,
pub availability_zone_id: Option<String>,
}
#[derive(Serialize, Deserialize)]
@@ -74,17 +74,6 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsResponse {
pub updated: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -112,21 +101,6 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,
pub shards: Vec<NodeShard>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShard {
pub tenant_shard_id: TenantShardId,
/// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_observed_secondary: Option<bool>,
/// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_intended_secondary: Option<bool>,
}
#[derive(Serialize, Deserialize)]
pub struct NodeDescribeResponse {
pub id: NodeId,
@@ -158,12 +132,8 @@ pub struct TenantDescribeResponseShard {
pub is_splitting: bool,
pub scheduling_policy: ShardSchedulingPolicy,
pub preferred_az_id: Option<String>,
}
/// Migration request for a given tenant shard to a given node.
///
/// Explicitly migrating a particular shard is a low level operation
/// TODO: higher level "Reschedule tenant" operation where the request
/// specifies some constraints, e.g. asking it to get off particular node(s)

View File

@@ -263,6 +263,15 @@ impl Key {
field5: u8::MAX,
field6: u32::MAX,
};
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
pub const NON_L0_MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX - 1,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {

View File

@@ -48,7 +48,7 @@ pub struct ShardedRange<'a> {
// Calculate the size of a range within the blocks of the same relation, or spanning only the
// top page in the previous relation's space.
fn contiguous_range_len(range: &Range<Key>) -> u32 {
pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
debug_assert!(is_contiguous_range(range));
if range.start.field6 == 0xffffffff {
range.end.field6 + 1
@@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range<Key>) -> u32 {
/// This matters, because:
/// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
/// - Within such ranges, we may calculate distances using simple subtraction of field6.
fn is_contiguous_range(range: &Range<Key>) -> bool {
pub fn is_contiguous_range(range: &Range<Key>) -> bool {
range.start.field1 == range.end.field1
&& range.start.field2 == range.end.field2
&& range.start.field3 == range.end.field3

View File

@@ -6,7 +6,6 @@ pub use utilization::PageserverUtilization;
use std::{
collections::HashMap,
fmt::Display,
io::{BufRead, Read},
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
str::FromStr,
@@ -62,7 +61,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::VariantNames,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
@@ -305,10 +304,8 @@ pub struct TenantConfig {
pub lsn_lease_length_for_ts: Option<String>,
}
/// The policy for the aux file storage.
///
/// It can be switched through `switch_aux_file_policy` tenant config.
/// When the first aux file written, the policy will be persisted in the
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
@@ -438,9 +435,7 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ImageCompressionAlgorithm {
// Disabled for writes, support decompressing during read path
Disabled,
@@ -475,33 +470,11 @@ impl FromStr for ImageCompressionAlgorithm {
}
}
impl Display for ImageCompressionAlgorithm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
ImageCompressionAlgorithm::Zstd { level } => {
if let Some(level) = level {
write!(f, "zstd({})", level)
} else {
write!(f, "zstd")
}
}
}
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")]
@@ -743,17 +716,12 @@ pub struct TimelineInfo {
pub pg_version: u32,
pub state: TimelineState,
pub is_archived: bool,
pub walreceiver_status: String,
// ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
// Backward compatibility: you will get a JSON not containing the newly-added field.
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
// read.
/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
pub is_archived: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -898,9 +866,7 @@ pub struct WalRedoManagerStatus {
pub process: Option<WalRedoManagerProcessStatus>,
}
/// The progress of a secondary tenant.
///
/// It is mostly useful when doing a long running download: e.g. initiating
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
/// what's happening.
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
@@ -1685,33 +1651,21 @@ mod tests {
#[test]
fn test_image_compression_algorithm_parsing() {
use ImageCompressionAlgorithm::*;
let cases = [
("disabled", Disabled),
("zstd", Zstd { level: None }),
("zstd(18)", Zstd { level: Some(18) }),
("zstd(-3)", Zstd { level: Some(-3) }),
];
for (display, expected) in cases {
assert_eq!(
ImageCompressionAlgorithm::from_str(display).unwrap(),
expected,
"parsing works"
);
assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
let ser = serde_json::to_string(&expected).expect("serialization");
assert_eq!(
serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
expected,
"serde roundtrip"
);
assert_eq!(
serde_json::Value::String(display.to_string()),
serde_json::to_value(expected).unwrap(),
"Display is the serde serialization"
);
}
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
Disabled
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
Zstd { level: None }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
Zstd { level: Some(18) }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
Zstd { level: Some(-3) }
);
}
}

View File

@@ -89,19 +89,8 @@ impl PageserverUtilization {
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
///
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
pub fn is_overloaded(score: RawScore) -> bool {
// Why the factor of two? This is unscientific but reflects behavior of real systems:
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
// until some more nodes are deployed.
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
// some tenants are very idle and have dropped layers from disk. In practice going up to
// double is generally better than giving up and scheduling in a sub-optimal AZ.
score >= 2 * Self::UTILIZATION_FULL
score >= Self::UTILIZATION_FULL
}
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {

View File

@@ -69,10 +69,8 @@ impl QueryError {
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection.
///
/// These errors can happen during normal operations,
/// and don't indicate a bug in our code.
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
@@ -81,16 +79,17 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
fn process_query(
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> impl Future<Output = Result<(), QueryError>>;
) -> Result<(), QueryError>;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -23,6 +23,7 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -7,7 +7,6 @@ use std::fmt;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
///
/// The `host` part should be a correct `url::Host`, while `port` (if present) should be
/// a valid decimal u16 of digits only.
pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>), anyhow::Error> {

View File

@@ -14,7 +14,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
fn include_file(&self, filename: &str) {
// This does the equivalent of passing bindgen::CargoCallbacks
// to the builder .parse_callbacks() method.
let cargo_callbacks = bindgen::CargoCallbacks::new();
let cargo_callbacks = bindgen::CargoCallbacks;
cargo_callbacks.include_file(filename)
}
@@ -121,7 +121,6 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogPageHeaderData")
.allowlist_type("XLogLongPageHeaderData")
.allowlist_var("XLOG_PAGE_MAGIC")
.allowlist_var("PG_MAJORVERSION_NUM")
.allowlist_var("PG_CONTROL_FILE_SIZE")
.allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.allowlist_type("PageHeaderData")

View File

@@ -44,9 +44,6 @@ macro_rules! postgres_ffi {
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
}
};
}
@@ -109,107 +106,6 @@ macro_rules! dispatch_pgversion {
};
}
#[macro_export]
macro_rules! enum_pgversion_dispatch {
($name:expr, $typ:ident, $bind:ident, $code:block) => {
enum_pgversion_dispatch!(
name = $name,
bind = $bind,
typ = $typ,
code = $code,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
)
};
(name = $name:expr,
bind = $bind:ident,
typ = $typ:ident,
code = $code:block,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
match $name {
$(
self::$typ::$variant($bind) => {
use $crate::$md as pgv;
$code
}
),+,
}
};
}
#[macro_export]
macro_rules! enum_pgversion {
{$name:ident, pgv :: $t:ident} => {
enum_pgversion!{
name = $name,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{$name:ident, pgv :: $p:ident :: $t:ident} => {
enum_pgversion!{
name = $name,
path = $p,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{name = $name:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ( $crate::$md::$t )),+
}
impl self::$name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<self::$name> for $crate::$md::$t {
fn into(self) -> self::$name {
self::$name::$variant (self)
}
}
)+
};
{name = $name:ident,
path = $p:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ($crate::$md::$p::$t)),+
}
impl $name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<$name> for $crate::$md::$p::$t {
fn into(self) -> $name {
$name::$variant (self)
}
}
)+
};
}
pub mod pg_constants;
pub mod relfile_utils;

View File

@@ -185,7 +185,7 @@ mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
@@ -235,31 +235,6 @@ timeout = '5s'";
);
}
#[test]
fn test_storage_class_serde_roundtrip() {
let classes = [
None,
Some(StorageClass::Standard),
Some(StorageClass::IntelligentTiering),
];
for class in classes {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_storage_class",
serialize_with = "serialize_storage_class"
)]
class: Option<StorageClass>,
}
let wrapped = Wrapper {
class: class.clone(),
};
let serialized = serde_json::to_string(&wrapped).unwrap();
let deserialized: Wrapper = serde_json::from_str(&serialized).unwrap();
assert_eq!(class, deserialized.class);
}
}
#[test]
fn test_azure_parsing() {
let toml = "\

View File

@@ -45,8 +45,6 @@ pub use azure_core::Etag;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Default concurrency limit for S3 operations
///
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -302,9 +300,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<(), TimeTravelError>;
}
/// Data part of an ongoing [`Download`].
///
/// `DownloadStream` is sensitive to the timeout and cancellation used with the original
/// DownloadStream is sensitive to the timeout and cancellation used with the original
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
/// with `tokio::io::copy_buf`.
// This has 'static because safekeepers do not use cancellation tokens (yet)

View File

@@ -60,16 +60,3 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}

View File

@@ -5,10 +5,9 @@
mod calculation;
pub mod svg;
/// StorageModel is the input to the synthetic size calculation.
///
/// It represents a tree of timelines, with just the information that's needed
/// for the calculation. This doesn't track timeline names or where each timeline
/// StorageModel is the input to the synthetic size calculation. It represents
/// a tree of timelines, with just the information that's needed for the
/// calculation. This doesn't track timeline names or where each timeline
/// begins and ends, for example. Instead, it consists of "points of interest"
/// on the timelines. A point of interest could be the timeline start or end point,
/// the oldest point on a timeline that needs to be retained because of PITR

View File

@@ -5,10 +5,8 @@ use std::{
use metrics::IntCounter;
/// Circuit breakers are for operations that are expensive and fallible.
///
/// If a circuit breaker fails repeatedly, we will stop attempting it for some
/// period of time, to avoid denial-of-service from retries, and
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// An identifier that enables us to log useful errors when a circuit is broken

View File

@@ -1,4 +1,3 @@
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
@@ -204,27 +203,6 @@ pub fn overwrite(
Ok(())
}
/// Syncs the filesystem for the given file descriptor.
#[cfg_attr(target_os = "macos", allow(unused_variables))]
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use anyhow::Context;
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -249,10 +249,8 @@ macro_rules! id_newtype {
};
}
/// Neon timeline ID.
///
/// They are different from PostgreSQL timeline
/// IDs, but serve a similar purpose: they differentiate
/// Neon timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given

View File

@@ -100,9 +100,7 @@ pub enum LockFileRead {
}
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
/// inspect its content.
///
/// It is not an `Err(...)` if the file does not exist or is already locked.
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);

View File

@@ -3,9 +3,9 @@ use std::str::FromStr;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
@@ -188,7 +188,7 @@ impl Drop for TracingPanicHookGuard {
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();
@@ -274,14 +274,6 @@ impl From<String> for SecretString {
}
}
impl FromStr for SecretString {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(s.to_string()))
}
}
impl std::fmt::Debug for SecretString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[SECRET]")

View File

@@ -8,7 +8,6 @@ use tracing::{trace, warn};
use crate::lsn::Lsn;
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.

View File

@@ -65,8 +65,6 @@ impl<T> Poison<T> {
}
}
/// Armed pointer to a [`Poison`].
///
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned

View File

@@ -13,11 +13,10 @@ pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount.
///
/// For use within the context of a particular tenant, when we need to know which shard we're
/// dealing with, but do not need to know the full ShardIdentity (because we won't be doing
/// any page->shard mapping), and do not need to know the fully qualified TenantShardId.
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,

View File

@@ -49,11 +49,12 @@ use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
/// Rcu allows multiple readers to read and hold onto a value without blocking
/// (for very long).
///
/// Storing to the Rcu updates the value, making new readers immediately see
/// the new value, but it also waits for all current readers to finish.
/// Rcu allows multiple readers to read and hold onto a value without blocking
/// (for very long). Storing to the Rcu updates the value, making new readers
/// immediately see the new value, but it also waits for all current readers to
/// finish.
///
pub struct Rcu<V> {
inner: RwLock<RcuInner<V>>,
}

View File

@@ -5,9 +5,7 @@ use std::sync::{
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`.
///
/// Allows use of `take` which does not require holding an outer mutex guard
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].

View File

@@ -10,7 +10,7 @@ pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::DocumentMut = match item {
let document: toml_edit::Document = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()

View File

@@ -7,7 +7,6 @@ pub enum VecMapOrdering {
}
/// Ordered map datastructure implemented in a Vec.
///
/// Append only - can only add keys that are larger than the
/// current max key.
/// Ordering can be adjusted using [`VecMapOrdering`]

View File

@@ -6,10 +6,9 @@ pub enum YieldingLoopError {
Cancelled,
}
/// Helper for long synchronous loops, e.g. over all tenants in the system.
///
/// Periodically yields to avoid blocking the executor, and after resuming
/// checks the provided cancellation token to drop out promptly on shutdown.
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
/// yields to avoid blocking the executor, and after resuming checks the provided
/// cancellation token to drop out promptly on shutdown.
#[inline(always)]
pub async fn yielding_loop<I, T, F>(
interval: usize,
@@ -24,7 +23,7 @@ where
for (i, item) in iter.enumerate() {
visitor(item);
if (i + 1) % interval == 0 {
if i + 1 % interval == 0 {
tokio::task::yield_now().await;
if cancel.is_cancelled() {
return Err(YieldingLoopError::Cancelled);

View File

@@ -4,6 +4,7 @@
use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};
use bindgen::CargoCallbacks;
fn main() -> anyhow::Result<()> {
// Tell cargo to invalidate the built crate whenever the wrapper changes
@@ -63,25 +64,16 @@ fn main() -> anyhow::Result<()> {
.map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
};
let unwind_abi_functions = [
"log_internal",
"recovery_download",
"start_streaming",
"finish_sync_safekeepers",
"wait_event_set",
"WalProposerStart",
];
// The bindgen::Builder is the main entry point
// to bindgen, and lets you build up options for
// the resulting bindings.
let mut builder = bindgen::Builder::default()
let bindings = bindgen::Builder::default()
// The input header we would like to generate
// bindings for.
.header("bindgen_deps.h")
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.parse_callbacks(Box::new(CargoCallbacks))
.allowlist_type("WalProposer")
.allowlist_type("WalProposerConfig")
.allowlist_type("walproposer_api")
@@ -113,12 +105,7 @@ fn main() -> anyhow::Result<()> {
.allowlist_var("WL_SOCKET_MASK")
.clang_arg("-DWALPROPOSER_LIB")
.clang_arg(format!("-I{pgxn_neon}"))
.clang_arg(format!("-I{inc_server_path}"));
for name in unwind_abi_functions {
builder = builder.override_abi(bindgen::Abi::CUnwind, name);
}
let bindings = builder
.clang_arg(format!("-I{inc_server_path}"))
// Finish the builder and generate the bindings.
.generate()
// Unwrap the Result and panic on failure.

View File

@@ -33,7 +33,7 @@ extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemStat
}
}
extern "C-unwind" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -187,7 +187,7 @@ extern "C" fn conn_blocking_write(
}
}
extern "C-unwind" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -272,7 +272,7 @@ extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
}
}
extern "C-unwind" fn wait_event_set(
extern "C" fn wait_event_set(
wp: *mut WalProposer,
timeout: ::std::os::raw::c_long,
event_sk: *mut *mut Safekeeper,
@@ -324,7 +324,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
}
}
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -340,7 +340,7 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, sk: *mut Safekee
}
}
extern "C-unwind" fn log_internal(
extern "C" fn log_internal(
wp: *mut WalProposer,
level: ::std::os::raw::c_int,
line: *const ::std::os::raw::c_char,

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing" ]
testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
@@ -101,7 +101,6 @@ procfs.workspace = true
criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
indoc.workspace = true
[[bench]]
name = "bench_layer_map"

View File

@@ -4,7 +4,7 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
config::{defaults::DEFAULT_IO_BUFFER_ALIGNMENT, PageServerConf},
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
@@ -167,7 +167,7 @@ fn criterion_benchmark(c: &mut Criterion) {
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(conf.page_cache_size);

View File

@@ -1,20 +1,2 @@
pub mod mgmt_api;
pub mod page_service;
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
// If file structure is per-kind not per-feature then where to put this?
#[derive(Clone, Copy)]
pub enum BlockUnblock {
Block,
Unblock,
}
impl std::fmt::Display for BlockUnblock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
BlockUnblock::Block => "block",
BlockUnblock::Unblock => "unblock",
};
f.write_str(s)
}
}

View File

@@ -12,8 +12,6 @@ use utils::{
pub use reqwest::Body as ReqwestBody;
use crate::BlockUnblock;
pub mod util;
#[derive(Debug, Clone)]
@@ -456,20 +454,6 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
self.mgmt_api_endpoint,
);
self.request(Method::POST, &uri, ()).await.map(|_| ())
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -4,6 +4,7 @@
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -147,7 +148,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);

View File

@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
@@ -193,7 +194,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);

View File

@@ -20,13 +20,14 @@ use clap::{Parser, Subcommand};
use index_part::IndexPartCmd;
use layers::LayerCmd;
use pageserver::{
config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
use remote_storage::{RemotePath, RemoteStorageConfig};
use tokio_util::sync::CancellationToken;
@@ -174,7 +175,7 @@ async fn main() -> anyhow::Result<()> {
println!("specified prefix '{}' failed validation", cmd.prefix);
return Ok(());
};
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
let toml_document = toml_edit::Document::from_str(&cmd.config_toml_str)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");

View File

@@ -0,0 +1,61 @@
use anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use pageserver::{pg_import, virtual_file::{self, api::IoEngineKind}};
use utils::id::{TenantId, TimelineId};
use utils::logging::{self, LogFormat, TracingErrorLayerEnablement};
use std::str::FromStr;
//project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
//version = GIT_VERSION,
about = "Utility to import a Postgres data directory directly into image layers",
//long_about = "..."
)]
struct CliOpts {
/// Input Postgres data directory
pgdata: Utf8PathBuf,
/// Path to local dir where the layer files will be stored
dest_path: Utf8PathBuf,
#[arg(long, default_value_t = TenantId::from_str("42424242424242424242424242424242").unwrap())]
tenant_id: TenantId,
#[arg(long, default_value_t = TimelineId::from_str("42424242424242424242424242424242").unwrap())]
timeline_id: TimelineId,
}
fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)?;
virtual_file::init(
100,
IoEngineKind::StdFs,
512,
);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let cli = CliOpts::parse();
rt.block_on(async_main(cli))?;
Ok(())
}
async fn async_main(cli: CliOpts) -> anyhow::Result<()> {
let mut import = pg_import::PgImportEnv::init(&cli.dest_path, cli.tenant_id, cli.timeline_id).await?;
import.import_datadir(&cli.pgdata).await?;
Ok(())
}

View File

@@ -5,7 +5,6 @@
use std::env;
use std::env::{var, VarError};
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -37,7 +36,6 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::crashsafe::syncfs;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::{
@@ -126,6 +124,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -156,7 +155,23 @@ fn main() -> anyhow::Result<()> {
};
let started = Instant::now();
syncfs(dirfd)?;
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use std::os::fd::AsRawFd;
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
drop(dirfd);
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),
@@ -208,15 +223,27 @@ fn initialize_config(
}
};
let config_file_contents =
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
let config_toml = serde_path_to_error::deserialize(
toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
let config: toml_edit::Document = match std::fs::File::open(cfg_file_path) {
Ok(mut f) => {
let md = f.metadata().context("stat config file")?;
if md.is_file() {
let mut s = String::new();
f.read_to_string(&mut s).context("read config file")?;
s.parse().context("parse config file toml")?
} else {
anyhow::bail!("directory entry exists but is not a file: {cfg_file_path}");
}
}
Err(e) => {
anyhow::bail!("open pageserver config: {e}: {cfg_file_path}");
}
};
debug!("Using pageserver toml: {config}");
// Construct the runtime representation
let conf = PageServerConf::parse_and_validate(identity.id, &config, workdir)
.context("Failed to parse pageserver configuration")?;
Ok(Box::leak(Box::new(conf)))
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,7 @@
//! Defines [`RequestContext`].
//!
//! It is a structure that we use throughout the pageserver to propagate
//! high-level context from places that _originate_ activity down to the
//! shared code paths at the heart of the pageserver. It's inspired by
//! Golang's `context.Context`.
//! This module defines `RequestContext`, a structure that we use throughout
//! the pageserver to propagate high-level context from places
//! that _originate_ activity down to the shared code paths at the
//! heart of the pageserver. It's inspired by Golang's `context.Context`.
//!
//! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
//! 1. What high-level activity ([`TaskKind`]) needs this page?

View File

@@ -141,24 +141,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
m.other
);
let az_id = {
let az_id_from_metadata = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
match az_id_from_metadata {
Some(az_id) => Some(az_id),
None => {
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
conf.availability_zone.clone()
}
}
};
if az_id.is_none() {
panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
}
let az_id = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
Some(NodeRegisterRequest {
node_id: conf.id,
@@ -166,7 +152,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
availability_zone_id: az_id.expect("Checked above"),
availability_zone_id: az_id,
})
}
Err(e) => {

View File

@@ -41,15 +41,19 @@
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
// reading these fields. We use the Debug impl for semi-structured logging, though.
use std::{sync::Arc, time::SystemTime};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::Context;
use pageserver_api::{config::DiskUsageEvictionTaskConfig, shard::TenantShardId};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::serde_percent::Percent;
use utils::{completion, id::TimelineId};
use crate::{
@@ -65,9 +69,23 @@ use crate::{
CancellableTask, DiskUsageEvictionTask,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: Percent,
pub min_avail_bytes: u64,
#[serde(with = "humantime_serde")]
pub period: Duration,
#[cfg(feature = "testing")]
pub mock_statvfs: Option<crate::statvfs::mock::Behavior>,
/// Select sorting for evicted layers
#[serde(default)]
pub eviction_order: EvictionOrder,
}
/// Selects the sort order for eviction candidates *after* per tenant `min_resident_size`
/// partitioning.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", content = "args")]
pub enum EvictionOrder {
/// Order the layers to be evicted by how recently they have been accessed relatively within
/// the set of resident layers of a tenant.
@@ -78,22 +96,23 @@ pub enum EvictionOrder {
/// we read tenants is deterministic. If we find the need to use this as `false`, we need
/// to ensure nondeterminism by adding in a random number to break the
/// `relative_last_activity==0.0` ties.
#[serde(default = "default_highest_layer_count_loses_first")]
highest_layer_count_loses_first: bool,
},
}
impl From<pageserver_api::config::EvictionOrder> for EvictionOrder {
fn from(value: pageserver_api::config::EvictionOrder) -> Self {
match value {
pageserver_api::config::EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first,
} => Self::RelativeAccessed {
highest_layer_count_loses_first,
},
impl Default for EvictionOrder {
fn default() -> Self {
Self::RelativeAccessed {
highest_layer_count_loses_first: true,
}
}
}
fn default_highest_layer_count_loses_first() -> bool {
true
}
impl EvictionOrder {
fn sort(&self, candidates: &mut [(EvictionPartition, EvictionCandidate)]) {
use EvictionOrder::*;
@@ -276,7 +295,7 @@ async fn disk_usage_eviction_task_iteration(
storage,
usage_pre,
tenant_manager,
task_config.eviction_order.into(),
task_config.eviction_order,
cancel,
)
.await;
@@ -1238,6 +1257,7 @@ mod filesystem_level_usage {
#[test]
fn max_usage_pct_pressure() {
use super::EvictionOrder;
use super::Usage as _;
use std::time::Duration;
use utils::serde_percent::Percent;
@@ -1249,7 +1269,7 @@ mod filesystem_level_usage {
period: Duration::MAX,
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: pageserver_api::config::EvictionOrder::default(),
eviction_order: EvictionOrder::default(),
},
total_bytes: 100_000,
avail_bytes: 0,

View File

@@ -468,7 +468,7 @@ async fn build_timeline_info_common(
pg_version: timeline.pg_version,
state,
is_archived: Some(is_archived),
is_archived,
walreceiver_status,
@@ -2076,7 +2076,7 @@ async fn disk_usage_eviction_run(
evict_bytes: u64,
#[serde(default)]
eviction_order: pageserver_api::config::EvictionOrder,
eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
}
#[derive(Debug, Clone, Copy, serde::Serialize)]
@@ -2112,7 +2112,7 @@ async fn disk_usage_eviction_run(
&state.remote_storage,
usage,
&state.tenant_manager,
config.eviction_order.into(),
config.eviction_order,
&cancel,
)
.await;

View File

@@ -19,7 +19,6 @@ use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::decode_wal_record;
use crate::walrecord::DecodedWALRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
@@ -311,13 +310,11 @@ async fn import_wal(
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
walingest
.ingest_record(decoded, lsn, &mut modification, ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
WAL_INGEST.records_committed.inc();
@@ -452,12 +449,11 @@ pub async fn import_wal_from_tar(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
walingest
.ingest_record(decoded, lsn, &mut modification, ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;

View File

@@ -1,7 +1,9 @@
use std::{num::NonZeroUsize, sync::Arc};
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
}
@@ -14,16 +16,6 @@ impl Default for L0FlushConfig {
}
}
impl From<pageserver_api::models::L0FlushConfig> for L0FlushConfig {
fn from(config: pageserver_api::models::L0FlushConfig) -> Self {
match config {
pageserver_api::models::L0FlushConfig::Direct { max_concurrency } => {
Self::Direct { max_concurrency }
}
}
}
}
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);

View File

@@ -32,6 +32,7 @@ pub mod virtual_file;
pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod pg_import;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;

View File

@@ -9,7 +9,7 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;
@@ -27,7 +27,7 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
];
// Metrics collected on operations on the storage repository.
#[derive(Debug, VariantNames, IntoStaticStr)]
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum StorageTimeOperation {
#[strum(serialize = "layer flush")]

View File

@@ -1199,6 +1199,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

650
pageserver/src/pg_import.rs Normal file
View File

@@ -0,0 +1,650 @@
use std::fs::metadata;
use anyhow::{bail, ensure, Context};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag};
use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ};
use tokio::{io::AsyncRead, task::{self, JoinHandle}};
use tracing::debug;
use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}};
use walkdir::WalkDir;
use crate::{context::{DownloadBehavior, RequestContext}, pgdatadir_mapping::{DbDirectory, RelDirectory}, task_mgr::TaskKind, tenant::storage_layer::ImageLayerWriter};
use crate::pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory};
use crate::config::PageServerConf;
use tokio::io::AsyncReadExt;
use crate::tenant::storage_layer::PersistentLayerDesc;
use utils::generation::Generation;
use utils::lsn::Lsn;
use crate::tenant::IndexPart;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{is_contiguous_range, contiguous_range_len};
use pageserver_api::keyspace::singleton_range;
use pageserver_api::reltag::SlruKind;
use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY};
use utils::bin_ser::BeSer;
use std::collections::HashSet;
use std::ops::Range;
pub struct PgImportEnv {
conf: &'static PageServerConf,
tli: TimelineId,
tsi: TenantShardId,
pgdata_lsn: Lsn,
tasks: Vec<AnyImportTask>,
layers: Vec<PersistentLayerDesc>,
}
impl PgImportEnv {
pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result<PgImportEnv> {
let config = toml_edit::Document::new();
let conf = PageServerConf::parse_and_validate(
NodeId(42),
&config,
dstdir
)?;
let conf = Box::leak(Box::new(conf));
let tsi = TenantShardId {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
Ok(PgImportEnv {
conf,
tli: timeline_id,
tsi,
pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported
tasks: Vec::new(),
layers: Vec::new(),
})
}
pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf) -> anyhow::Result<()> {
// Read control file
let controlfile_path = pgdata_path.join("global").join("pg_control");
let controlfile_buf = std::fs::read(&controlfile_path)
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
let control_file = ControlFileData::decode(&controlfile_buf)?;
let pgdata_lsn = Lsn(control_file.checkPoint).align();
let timeline_path = self.conf.timeline_path(&self.tsi, &self.tli);
println!("Importing {pgdata_path} to {timeline_path} as lsn {pgdata_lsn}...");
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(pgdata_path);
// Import dbdir (00:00:00 keyspace)
// This is just constructed here, but will be written to the image layer in the first call to import_db()
let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(),
})?);
self.tasks.push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
// Import databases (00:spcnode:dbnode keyspace for each db)
for db in datadir.dbs {
self.import_db(&db).await?;
}
// Import SLRUs
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &pgdata_path.join("pg_xact")).await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?;
// Import pg_twophase.
// TODO: as empty
let twophasedir_buf = TwoPhaseDirectory::ser(
&TwoPhaseDirectory { xids: HashSet::new() }
)?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf))));
// Controlfile, checkpoint
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CONTROLFILE_KEY, Bytes::from(controlfile_buf))));
let checkpoint_buf = control_file.checkPointCopy.encode()?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf)));
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size() > 1024*1024*1024 {
let key_range = last_end_key..task.key_range().start;
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
self
));
last_end_key = key_range.end;
current_chunk_size = 0;
}
current_chunk_size += task.total_size();
current_chunk.push(task);
}
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::NON_L0_MAX,
current_chunk,
self
));
// Start all jobs simultaneosly
// TODO: semaphore?
let mut handles = vec![];
for job in parallel_jobs {
let handle: JoinHandle<anyhow::Result<PersistentLayerDesc>> = task::spawn(async move {
let layerdesc = job.run().await?;
Ok(layerdesc)
});
handles.push(handle);
}
// Wait for all jobs to complete
for handle in handles {
let layerdesc = handle.await??;
self.layers.push(layerdesc);
}
// Create index_part.json file
self.create_index_part(&control_file).await?;
Ok(())
}
async fn import_db(
&mut self,
db: &PgDataDirDb,
) -> anyhow::Result<()> {
debug!(
"Importing database (path={}, tablespace={}, dboid={})",
db.path, db.spcnode, db.dboid
);
// Import relmap (00:spcnode:dbnode:00:*:00)
let relmap_key = relmap_file_key(db.spcnode, db.dboid);
debug!("Constructing relmap entry, key {relmap_key}");
let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?;
let relmap_buf = read_all_bytes(&mut relmap_file).await?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(relmap_key, relmap_buf)));
// Import reldir (00:spcnode:dbnode:00:*:01)
let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
debug!("Constructing reldirs entry, key {reldir_key}");
let reldir_buf = RelDirectory::ser(&RelDirectory {
rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(),
})?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(reldir_key, Bytes::from(reldir_buf))));
// Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
// segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
for file in &db.files {
let len = metadata(&file.path)?.len() as usize;
ensure!(len % 8192 == 0);
let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
let start_key = rel_block_to_key(file.rel_tag, start_blk);
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(start_key..end_key, &file.path)));
// Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
if let Some(nblocks) = file.nblocks {
let size_key = rel_size_to_key(file.rel_tag);
//debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
let buf = nblocks.to_le_bytes();
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(size_key, Bytes::from(buf.to_vec()))));
}
}
Ok(())
}
async fn import_slru(
&mut self,
kind: SlruKind,
path: &Utf8PathBuf,
) -> anyhow::Result<()> {
let segments: Vec<(String, u32)> = WalkDir::new(path)
.max_depth(1)
.into_iter()
.filter_map(|entry| {
let entry = entry.ok()?;
let filename = entry.file_name();
let filename = filename.to_string_lossy();
let segno = u32::from_str_radix(&filename, 16).ok()?;
Some((filename.to_string(), segno))
}).collect();
// Write SlruDir
let slrudir_key = slru_dir_to_key(kind);
let segnos: HashSet<u32> = segments.iter().map(|(_path, segno)| { *segno }).collect();
let slrudir = SlruSegmentDirectory {
segments: segnos,
};
let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(slrudir_key, Bytes::from(slrudir_buf))));
for (segpath, segno) in segments {
// SlruSegBlocks for each segment
let p = path.join(Utf8PathBuf::from(segpath));
let file_size = std::fs::metadata(&p)?.len();
ensure!(file_size % 8192 == 0);
let nblocks = u32::try_from(file_size / 8192)?;
let start_key = slru_block_to_key(kind, segno, 0);
let end_key = slru_block_to_key(kind, segno, nblocks);
self.tasks.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(start_key..end_key, &p)));
// Followed by SlruSegSize
let segsize_key = slru_segment_size_to_key(kind, segno);
let segsize_buf = nblocks.to_le_bytes();
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(segsize_key, Bytes::copy_from_slice(&segsize_buf))));
}
Ok(())
}
async fn create_index_part(&mut self, control_file: &ControlFileData) -> anyhow::Result<()> {
let dstdir = &self.conf.workdir;
let pg_version = match control_file.catalog_version_no {
// thesea are from catversion.h
202107181 => 14,
202209061 => 15,
202307071 => 16,
catversion => { bail!("unrecognized catalog version {catversion}")},
};
let metadata = TimelineMetadata::new(
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
Lsn(self.pgdata_lsn.0 + 8),
Some(self.pgdata_lsn),
None, // no ancestor
Lsn(0),
self.pgdata_lsn, // latest_gc_cutoff_lsn
self.pgdata_lsn, // initdb_lsn
pg_version,
);
let generation = Generation::none();
let mut index_part = IndexPart::empty(metadata);
for l in self.layers.iter() {
let name = l.layer_name();
let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded());
if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) {
bail!("duplicate layer filename {name}");
}
}
let data = index_part.to_s3_bytes()?;
let path = remote_timeline_client::remote_index_path(&self.tsi, &self.tli, generation);
let path = dstdir.join(path.get_path());
std::fs::write(&path, data)
.context("could not write {path}")?;
Ok(())
}
}
//
// dbdir iteration tools
//
struct PgDataDir {
pub dbs: Vec<PgDataDirDb> // spcnode, dboid, path
}
struct PgDataDirDb {
pub spcnode: u32,
pub dboid: u32,
pub path: Utf8PathBuf,
pub files: Vec<PgDataDirDbFile>
}
struct PgDataDirDbFile {
pub path: Utf8PathBuf,
pub rel_tag: RelTag,
pub segno: u32,
// Cummulative size of the given fork, set only for the last segment of that fork
pub nblocks: Option<usize>,
}
impl PgDataDir {
fn new(datadir_path: &Utf8PathBuf) -> Self {
// Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
// Traverse database in increasing oid order
let mut databases = WalkDir::new(datadir_path.join("base"))
.max_depth(1)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
path.file_name().to_string_lossy().parse::<u32>().ok()
})
})
.sorted()
.map(|dboid| {
PgDataDirDb::new(
datadir_path.join("base").join(dboid.to_string()),
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
datadir_path
)
})
.collect::<Vec<_>>();
// special case for global catalogs
databases.push(PgDataDirDb::new(
datadir_path.join("global"),
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
0,
datadir_path,
));
databases.sort_by_key(|db| (db.spcnode, db.dboid));
Self {
dbs: databases
}
}
}
impl PgDataDirDb {
fn new(db_path: Utf8PathBuf, spcnode: u32, dboid: u32, datadir_path: &Utf8PathBuf) -> Self {
let mut files: Vec<PgDataDirDbFile> = WalkDir::new(&db_path)
.min_depth(1)
.max_depth(2)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
let relfile = path.file_name().to_string_lossy();
// returns (relnode, forknum, segno)
parse_relfilename(&relfile).ok()
})
})
.sorted()
.map(|(relnode, forknum, segno)| {
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
let path = datadir_path.join(rel_tag.to_segfile_name(segno));
let len = metadata(&path).unwrap().len() as usize;
assert!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
PgDataDirDbFile {
path,
rel_tag,
segno,
nblocks: Some(nblocks), // first non-cummulative sizes
}
})
.collect();
// Set cummulative sizes. Do all of that math here, so that later we could easier
// parallelize over segments and know with which segments we need to write relsize
// entry.
let mut cumulative_nblocks: usize= 0;
let mut prev_rel_tag: Option<RelTag> = None;
for i in 0..files.len() {
if prev_rel_tag == Some(files[i].rel_tag) {
cumulative_nblocks += files[i].nblocks.unwrap();
} else {
cumulative_nblocks = files[i].nblocks.unwrap();
}
files[i].nblocks = if i == files.len() - 1 || files[i+1].rel_tag != files[i].rel_tag {
Some(cumulative_nblocks)
} else {
None
};
prev_rel_tag = Some(files[i].rel_tag);
}
PgDataDirDb {
files,
path: db_path,
spcnode,
dboid,
}
}
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
trait ImportTask {
fn key_range(&self) -> Range<Key>;
fn total_size(&self) -> usize {
if is_contiguous_range(&self.key_range()) {
contiguous_range_len(&self.key_range()) as usize * 8192
} else {
u32::MAX as usize
}
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()>;
}
struct ImportSingleKeyTask {
key: Key,
buf: Bytes,
}
impl ImportSingleKeyTask {
fn new(key: Key, buf: Bytes) -> Self {
ImportSingleKeyTask { key, buf }
}
}
impl ImportTask for ImportSingleKeyTask {
fn key_range(&self) -> Range<Key> {
singleton_range(self.key)
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
layer_writer.put_image(self.key, self.buf, ctx).await?;
Ok(())
}
}
struct ImportRelBlocksTask {
key_range: Range<Key>,
path: Utf8PathBuf,
}
impl ImportRelBlocksTask {
fn new(key_range: Range<Key>, path: &Utf8Path) -> Self {
ImportRelBlocksTask {
key_range,
path: path.into()
}
}
}
impl ImportTask for ImportRelBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
debug!("Importing relation file {}", self.path);
let mut reader = tokio::fs::File::open(&self.path).await?;
let mut buf: [u8; 8192] = [0u8; 8192];
let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
let (_rel_tag, end_blk) = self.key_range.end.to_rel_block()?;
let mut blknum = start_blk;
while blknum < end_blk {
reader.read_exact(&mut buf).await?;
let key = rel_block_to_key(rel_tag.clone(), blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?;
blknum += 1;
}
Ok(())
}
}
struct ImportSlruBlocksTask {
key_range: Range<Key>,
path: Utf8PathBuf,
}
impl ImportSlruBlocksTask {
fn new(key_range: Range<Key>, path: &Utf8Path) -> Self {
ImportSlruBlocksTask {
key_range,
path: path.into()
}
}
}
impl ImportTask for ImportSlruBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
debug!("Importing SLRU segment file {}", self.path);
let mut reader = tokio::fs::File::open(&self.path).await
.context(format!("opening {}", &self.path))?;
let mut buf: [u8; 8192] = [0u8; 8192];
let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
let mut blknum = start_blk;
while blknum < end_blk {
reader.read_exact(&mut buf).await?;
let key = slru_block_to_key(kind, segno, blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?;
blknum += 1;
}
Ok(())
}
}
enum AnyImportTask {
SingleKey(ImportSingleKeyTask),
RelBlocks(ImportRelBlocksTask),
SlruBlocks(ImportSlruBlocksTask),
}
impl ImportTask for AnyImportTask {
fn key_range(&self) -> Range<Key> {
match self {
Self::SingleKey(t) => t.key_range(),
Self::RelBlocks(t) => t.key_range(),
Self::SlruBlocks(t) => t.key_range()
}
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
match self {
Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
}
}
}
impl From<ImportSingleKeyTask> for AnyImportTask {
fn from(t: ImportSingleKeyTask) -> Self {
Self::SingleKey(t)
}
}
impl From<ImportRelBlocksTask> for AnyImportTask {
fn from(t: ImportRelBlocksTask) -> Self {
Self::RelBlocks(t)
}
}
impl From<ImportSlruBlocksTask> for AnyImportTask {
fn from(t: ImportSlruBlocksTask) -> Self {
Self::SlruBlocks(t)
}
}
struct ChunkProcessingJob {
range: Range<Key>,
tasks: Vec<AnyImportTask>,
dstdir: Utf8PathBuf,
tenant_id: TenantId,
timeline_id: TimelineId,
pgdata_lsn: Lsn,
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &PgImportEnv) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
range,
tasks,
dstdir: env.conf.workdir.clone(),
tenant_id: env.tsi.tenant_id,
timeline_id: env.tli,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self) -> anyhow::Result<PersistentLayerDesc> {
let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let config = toml_edit::Document::new();
let conf: &'static PageServerConf = Box::leak(Box::new(PageServerConf::parse_and_validate(
NodeId(42),
&config,
&self.dstdir
)?));
let tsi = TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
let mut layer = ImageLayerWriter::new(
&conf,
self.timeline_id,
tsi,
&self.range,
self.pgdata_lsn,
&ctx,
).await?;
for task in self.tasks {
task.doit(&mut layer, &ctx).await?;
}
let layerdesc = layer.finish_raw(&ctx).await?;
Ok(layerdesc)
}
}

View File

@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use anyhow::{bail, ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
@@ -168,9 +168,7 @@ impl Timeline {
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_metadata_pages: HashMap::new(),
pending_data_pages: Vec::new(),
pending_zero_data_pages: Default::default(),
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
@@ -729,12 +727,8 @@ impl Timeline {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
let empty_str = if res.is_empty() { ", empty" } else { "" };
warn!(
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
);
Ok(res)
warn!("this timeline is using deprecated aux file policy V1 (policy=V1)");
self.list_aux_files_v1(lsn, ctx).await
}
None => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
@@ -1021,10 +1015,9 @@ impl Timeline {
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository.
///
/// It is created by the 'begin_record' function. It is called for each WAL
/// record, so that all the modifications by a one WAL record appear atomic.
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
pub struct DatadirModification<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
@@ -1038,24 +1031,10 @@ pub struct DatadirModification<'a> {
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
/// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
/// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
/// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
/// which keys are stored here.
pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
// Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
// if we encounter a write from postgres in the same wal record, we will drop this entry.
//
// Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
// at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
pending_zero_data_pages: HashSet<CompactKey>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
@@ -1079,10 +1058,6 @@ impl<'a> DatadirModification<'a> {
self.pending_bytes
}
pub(crate) fn has_dirty_data_pages(&self) -> bool {
(!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
@@ -1091,10 +1066,6 @@ impl<'a> DatadirModification<'a> {
lsn,
self.lsn
);
// If we are advancing LSN, then state from previous wal record should have been flushed.
assert!(self.pending_zero_data_pages.is_empty());
if lsn > self.lsn {
self.pending_lsns.push(self.lsn);
self.lsn = lsn;
@@ -1102,17 +1073,6 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
/// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
/// keys that represent literal blocks that postgres can read. So data includes relation blocks and
/// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
///
/// The distinction is important because data keys are handled on a fast path where dirty writes are
/// not readable until this modification is committed, whereas metadata keys are visible for read
/// via [`Self::get`] as soon as their record has been ingested.
fn is_data_key(key: &Key) -> bool {
key.is_rel_block_key() || key.is_slru_block_key()
}
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -1205,13 +1165,6 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -1223,63 +1176,10 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(key, Value::Image(img));
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
Ok(())
}
pub(crate) fn put_rel_page_image_zero(
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
pub(crate) fn put_slru_page_image_zero(
&mut self,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
/// Call this at the end of each WAL record.
pub(crate) fn on_record_end(&mut self) {
let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
for key in pending_zero_data_pages {
self.put_data(key, Value::Image(ZERO_PAGE.clone()));
}
}
/// Store a relmapper file (pg_filenode.map) in the repository
pub async fn put_relmap_file(
&mut self,
@@ -1697,7 +1597,7 @@ impl<'a> DatadirModification<'a> {
if aux_files_key_v1.is_empty() {
None
} else {
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
warn!("this timeline is using deprecated aux file policy V1");
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
@@ -1878,7 +1778,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1889,11 +1789,31 @@ impl<'a> DatadirModification<'a> {
let mut writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
for (key, values) in self.pending_updates.drain() {
if !key.is_valid_key_on_write_path() {
bail!(
"the request contains data not supported by pageserver at TimelineWriter::put: {}", key
);
}
let mut write_batch = Vec::new();
for (lsn, value_ser_size, value) in values {
if key.is_rel_block_key() || key.is_slru_block_key() {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
} else {
retained_pending_updates.entry(key).or_default().push((
lsn,
value_ser_size,
value,
));
}
}
writer.put_batch(write_batch, ctx).await?;
}
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put_batch(pending_data_pages, ctx).await?;
self.pending_updates = retained_pending_updates;
self.pending_bytes = 0;
if pending_nblocks != 0 {
@@ -1914,31 +1834,29 @@ impl<'a> DatadirModification<'a> {
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Commit should never be called mid-wal-record
assert!(self.pending_zero_data_pages.is_empty());
let mut writer = self.tline.writer().await;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let mut write_batch = std::mem::take(&mut self.pending_data_pages);
write_batch.extend(
self.pending_metadata_pages
if !self.pending_updates.is_empty() {
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
.pending_updates
.drain()
.flat_map(|(key, values)| {
values
.into_iter()
.map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
}),
);
values.into_iter().map(move |(lsn, val_ser_size, value)| {
if !key.is_valid_key_on_write_path() {
bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
}
Ok((key.to_compact(), lsn, val_ser_size, value))
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
if !write_batch.is_empty() {
writer.put_batch(write_batch, ctx).await?;
writer.put_batch(batch, ctx).await?;
}
if !self.pending_deletions.is_empty() {
@@ -1969,58 +1887,33 @@ impl<'a> DatadirModification<'a> {
}
pub(crate) fn len(&self) -> usize {
self.pending_metadata_pages.len()
+ self.pending_data_pages.len()
+ self.pending_deletions.len()
self.pending_updates.len() + self.pending_deletions.len()
}
/// Read a page from the Timeline we are writing to. For metadata pages, this passes through
/// a cache in Self, which makes writes earlier in this modification visible to WAL records later
/// in the modification.
///
/// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
/// page must ensure that the pages they read are already committed in Timeline, for example
/// DB create operations are always preceded by a call to commit(). This is special cased because
/// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
/// and not data pages.
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
if !Self::is_data_key(&key) {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
}
}
} else {
// This is an expensive check, so we only do it in debug mode. If reading a data key,
// this key should never be present in pending_data_pages. We ensure this by committing
// modifications before ingesting DB create operations, which are the only kind that reads
// data pages during ingest.
if cfg!(debug_assertions) {
for (dirty_key, _, _, _) in &self.pending_data_pages {
debug_assert!(&key.to_compact() != dirty_key);
}
// Internal helper functions to batch the modifications
debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
}
}
// Metadata page cache miss, or we're reading a data page.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
@@ -2032,40 +1925,11 @@ impl<'a> DatadirModification<'a> {
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
} else {
self.put_metadata(key.to_compact(), val)
}
}
fn put_data(&mut self, key: CompactKey, val: Value) {
let val_serialized_size = val.serialized_size().unwrap() as usize;
// If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
// is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
// and the subsequent postgres-originating write
if self.pending_zero_data_pages.remove(&key) {
self.pending_bytes -= ZERO_PAGE.len();
}
self.pending_bytes += val_serialized_size;
self.pending_data_pages
.push((key, self.lsn, val_serialized_size, val))
}
fn put_metadata(&mut self, key: CompactKey, val: Value) {
let values = self.pending_metadata_pages.entry(key).or_default();
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
// Update the pending_bytes contribution from this entry, and update the serialized size in place
self.pending_bytes -= *last_value_ser_size;
*last_value_ser_size = val.serialized_size().unwrap() as usize;
self.pending_bytes += *last_value_ser_size;
// Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
// have been generated by synthesized zero page writes prior to the first real write to a page.
*last_value = val;
return;
}
@@ -2084,7 +1948,6 @@ impl<'a> DatadirModification<'a> {
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the
@@ -2119,23 +1982,23 @@ impl<'a> Version<'a> {
//--- Metadata structs stored in key-value pairs in the repository.
#[derive(Debug, Serialize, Deserialize)]
struct DbDirectory {
pub struct DbDirectory {
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
dbdirs: HashMap<(Oid, Oid), bool>,
pub dbdirs: HashMap<(Oid, Oid), bool>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectory {
xids: HashSet<TransactionId>,
pub(crate) struct TwoPhaseDirectory {
pub(crate) xids: HashSet<TransactionId>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct RelDirectory {
pub struct RelDirectory {
// Set of relations that exist. (relfilenode, forknum)
//
// TODO: Store it as a btree or radix tree or something else that spans multiple
// key-value pairs, if you have a lot of relations
rels: HashSet<(Oid, u8)>,
pub rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
@@ -2159,9 +2022,9 @@ struct RelSizeEntry {
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct SlruSegmentDirectory {
pub(crate) struct SlruSegmentDirectory {
// Set of SLRU segments that exist.
segments: HashSet<u32>,
pub(crate) segments: HashSet<u32>,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]

View File

@@ -60,7 +60,32 @@ pub mod mock {
use regex::Regex;
use tracing::log::info;
pub use pageserver_api::config::statvfs::mock::Behavior;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum Behavior {
Success {
blocksize: u64,
total_blocks: u64,
name_filter: Option<utils::serde_regex::Regex>,
},
Failure {
mocked_error: MockedError,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[allow(clippy::upper_case_acronyms)]
pub enum MockedError {
EIO,
}
impl From<MockedError> for nix::Error {
fn from(e: MockedError) -> Self {
match e {
MockedError::EIO => nix::Error::EIO,
}
}
}
pub fn get(tenants_dir: &Utf8Path, behavior: &Behavior) -> nix::Result<Statvfs> {
info!("running mocked statvfs");
@@ -91,7 +116,6 @@ pub mod mock {
block_size: *blocksize,
})
}
#[cfg(feature = "testing")]
Behavior::Failure { mocked_error } => Err((*mocked_error).into()),
}
}

View File

@@ -1,9 +1,8 @@
//! Timeline repository implementation that keeps old data in layer files, and
//! the recent changes in ephemeral files.
//!
//! See tenant/*_layer.rs files. The functions here are responsible for locating
//! the correct layer for the get/put call, walking back the timeline branching
//! history as needed.
//! Timeline repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See tenant/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//!
//! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
//! directory. See docs/pageserver-storage.md for how the files are managed.
@@ -7091,13 +7090,13 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::MAX,
key_range: Key::MIN..Key::NON_L0_MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer below the horizon
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
PersistentLayerKey {
key_range: get_key(3)..get_key(4),
key_range: Key::MIN..Key::NON_L0_MAX,
lsn_range: Lsn(0x30)..Lsn(0x48),
is_delta: true
},

View File

@@ -9,10 +9,11 @@
//! may lead to a data loss.
//!
use anyhow::bail;
pub(crate) use pageserver_api::config::TenantConfigToml as TenantConf;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::CompactionAlgorithmSettings;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
@@ -22,6 +23,50 @@ use std::num::NonZeroU64;
use std::time::Duration;
use utils::generation::Generation;
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
// FIXME the below configs are only used by legacy algorithm. The new algorithm
// has different parameters.
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
super::CompactionAlgorithm::Legacy;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
// If there's a need to decrease this value, first make sure that GC
// doesn't hold a layer map write lock for non-trivial operations.
// Relevant: https://github.com/neondatabase/neon/issues/3394
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
// throughputs up to 1GiB/s per timeline.
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum AttachmentMode {
/// Our generation is current as far as we know, and as far as we know we are the only attached
@@ -236,20 +281,96 @@ impl LocationConf {
}
}
impl Default for LocationConf {
// TODO: this should be removed once tenant loading can guarantee that we are never
// loading from a directory without a configuration.
// => tech debt since https://github.com/neondatabase/neon/issues/1555
fn default() -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: Generation::none(),
attach_mode: AttachmentMode::Single,
}),
tenant_conf: TenantConfOpt::default(),
shard: ShardIdentity::unsharded(),
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Inmemory layer is also flushed at least once in checkpoint_timeout to
// eventually upload WAL after activity is stopped.
#[serde(with = "humantime_serde")]
pub checkpoint_timeout: Duration,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
// Duration::ZERO means automatic compaction is disabled.
#[serde(with = "humantime_serde")]
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
pub compaction_algorithm: CompactionAlgorithmSettings,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
// Page versions older than this are garbage collected away.
pub gc_horizon: u64,
// Interval at which garbage collection is triggered.
// Duration::ZERO means automatic GC is disabled
#[serde(with = "humantime_serde")]
pub gc_period: Duration,
// Delta layer churn threshold to create L1 image layers.
pub image_creation_threshold: usize,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is time.
// Page versions older than this are garbage collected away.
#[serde(with = "humantime_serde")]
pub pitr_interval: Duration,
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Duration,
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
/// A stalled safekeeper will be changed to a newer one when it appears.
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Duration,
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
/// may be disabled if a Tenant will not have secondary locations: only secondary
/// locations will use the heatmap uploaded by attached locations.
#[serde(with = "humantime_serde")]
pub heatmap_period: Duration,
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
pub lazy_slru_download: bool,
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
// How much WAL must be ingested before checking again whether a new image layer is required.
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
/// The length for an explicit LSN lease request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Duration,
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -424,6 +545,51 @@ impl TenantConfOpt {
}
}
impl Default for TenantConf {
fn default() -> Self {
use defaults::*;
Self {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
.expect("cannot parse default checkpoint timeout"),
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
compaction_algorithm: CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
},
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.expect("cannot parse default walreceiver connect timeout"),
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
}
}
}
impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
type Error = anyhow::Error;
@@ -452,8 +618,7 @@ impl TryFrom<toml_edit::Item> for TenantConfOpt {
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}
toml_edit::Item::Table(table) => {
let deserializer =
toml_edit::de::Deserializer::from(toml_edit::DocumentMut::from(table));
let deserializer = toml_edit::de::Deserializer::new(table.into());
return serde_path_to_error::deserialize(deserializer)
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}

View File

@@ -1,8 +1,7 @@
//! Describes the legacy now hopefully no longer modified per-timeline metadata.
//!
//! It is stored in `index_part.json` managed by [`remote_timeline_client`]. For many tenants and
//! their timelines, this struct and its original serialization format is still needed because
//! they were written a long time ago.
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
//! this struct and it's original serialization format is still needed because they were written a
//! long time ago.
//!
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
//! versioning.

View File

@@ -282,10 +282,9 @@ impl BackgroundPurges {
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
@@ -2347,9 +2346,8 @@ pub enum TenantMapError {
ShuttingDown,
}
/// Guards a particular tenant_id's content in the TenantsMap.
///
/// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// Guards a particular tenant_id's content in the TenantsMap. While this
/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// for this tenant, which acts as a marker for any operations targeting
/// this tenant to retry later, or wait for the InProgress state to end.
///

View File

@@ -2184,8 +2184,6 @@ pub fn remote_timeline_path(
remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
}
/// Obtains the path of the given Layer in the remote
///
/// Note that the shard component of a remote layer path is _not_ always the same
/// as in the TenantShardId of the caller: tenants may reference layers from a different
/// ShardIndex. Use the ShardIndex from the layer's metadata.

View File

@@ -548,7 +548,7 @@ pub(crate) async fn download_initdb_tar_zst(
cancel,
)
.await
.inspect_err(|_e| {
.map_err(|e| {
// Do a best-effort attempt at deleting the temporary file upon encountering an error.
// We don't have async here nor do we want to pile on any extra errors.
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -556,6 +556,7 @@ pub(crate) async fn download_initdb_tar_zst(
warn!("error deleting temporary file {temp_path}: {e}");
}
}
e
})?;
Ok((temp_path, file))

View File

@@ -1,5 +1,4 @@
//! In-memory index to track the tenant files on the remote storage.
//!
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.

View File

@@ -434,11 +434,10 @@ impl ReadableLayer {
}
}
/// Layers contain a hint indicating whether they are likely to be used for reads.
///
/// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
/// when changing the visibility of layers (for example when creating a branch that makes some previously
/// covered layers visible). It should be used for cache management but not for correctness-critical checks.
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it

View File

@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
@@ -52,7 +52,6 @@ use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
@@ -136,11 +135,10 @@ impl Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
/// Struct representing reference to BLOB in layers.
///
/// Reference contains BLOB offset, and for WAL records it also contains
/// `will_init` flag. The flag helps to determine the range of records
/// that needs to be applied, without reading/deserializing records themselves.
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);

View File

@@ -1,9 +1,7 @@
//! An ImageLayer represents an image or a snapshot of a key-range at
//! one particular LSN.
//!
//! It contains an image of all key-value pairs in its key-range. Any key
//! that falls into the image layer's range but does not exist in the layer,
//! does not exist.
//! one particular LSN. It contains an image of all key-value pairs
//! in its key-range. Any key that falls into the image layer's range
//! but does not exist in the layer, does not exist.
//!
//! An image layer is stored in a file on disk. The file is stored in
//! timelines/<timeline_id> directory. Currently, there are no
@@ -36,7 +34,8 @@ use crate::tenant::disk_btree::{
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
@@ -47,7 +46,6 @@ use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
@@ -798,12 +796,11 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
async fn finish(
async fn finish_layer(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<PersistentLayerDesc> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -879,8 +876,22 @@ impl ImageLayerWriterInner {
// fsync the file
file.sync_all().await?;
Ok(desc)
}
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
let path = self.path.clone();
let conf = self.conf;
let desc = self.finish_layer(ctx, end_key).await?;
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
info!("created image layer {}", layer.local_path());
@@ -969,6 +980,32 @@ impl ImageLayerWriter {
self.inner.take().unwrap().finish(timeline, ctx, None).await
}
/// Like finish(), but doesn't create the ResidentLayer struct. This can be used
/// by utilities that don't have a full-blown Timeline.
pub(crate) async fn finish_raw(
mut self,
ctx: &RequestContext,
) -> anyhow::Result<super::PersistentLayerDesc> {
let inner = self.inner.take().unwrap();
let name = ImageLayerName {
key_range: inner.key_range.clone(),
lsn: inner.lsn,
};
let temp_path = inner.path.clone();
let final_path = inner.conf.timeline_path(&inner.tenant_shard_id, &inner.timeline_id)
.join(name.to_string());
let desc = inner.finish_layer(ctx, None).await?;
// Rename the file to final name like Layer::finish_creating() does
utils::fs_ext::rename_noreplace(temp_path.as_std_path(), final_path.as_std_path())
.with_context(|| format!("rename temporary file as {final_path}"))?;
Ok(desc)
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,

View File

@@ -215,7 +215,7 @@ impl IndexEntry {
const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
let res = Self::validate_checkpoint_distance(
pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
crate::tenant::config::defaults::DEFAULT_CHECKPOINT_DISTANCE,
);
if res.is_err() {
panic!("default checkpoint distance is valid")
@@ -692,13 +692,8 @@ impl InMemoryLayer {
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
}
}

View File

@@ -12,10 +12,8 @@ use serde::{Deserialize, Serialize};
#[cfg(test)]
use utils::id::TenantId;
/// A unique identifier of a persistent layer.
///
/// This is different from `LayerDescriptor`, which is only used in the benchmarks.
/// This struct contains all necessary information to find the image / delta layer. It also provides
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {

View File

@@ -217,9 +217,8 @@ impl fmt::Display for ImageLayerName {
}
}
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time.
///
/// The LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time. The
/// LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// over time (e.g. across shard splits or compression). The physical filenames of layers in local
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])

View File

@@ -226,11 +226,9 @@ impl<'a> IteratorWrapper<'a> {
}
}
/// A merge iterator over delta/image layer iterators.
///
/// When duplicated records are found, the iterator will not perform any
/// deduplication, and the caller should handle these situation. By saying
/// duplicated records, there are many possibilities:
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
///
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.

View File

@@ -34,10 +34,9 @@ impl SplitWriterResult {
}
}
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
@@ -188,23 +187,22 @@ impl SplitImageLayerWriter {
.await
}
/// This function will be deprecated with #8841.
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
/// to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: Option<(Key, DeltaLayerWriter)>,
inner: DeltaLayerWriter,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
@@ -212,6 +210,7 @@ pub struct SplitDeltaLayerWriter {
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
start_key: Key,
}
impl SplitDeltaLayerWriter {
@@ -219,18 +218,29 @@ impl SplitDeltaLayerWriter {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: None,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
start_key,
})
}
@@ -253,26 +263,9 @@ impl SplitDeltaLayerWriter {
//
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
// strategy. https://github.com/neondatabase/neon/issues/8837
if self.inner.is_none() {
self.inner = Some((
key,
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
if key != self.last_key_written {
let next_delta_writer = DeltaLayerWriter::new(
@@ -284,13 +277,13 @@ impl SplitDeltaLayerWriter {
ctx,
)
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let layer_key = PersistentLayerKey {
key_range: start_key..key,
key_range: self.start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.start_key = key;
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
@@ -301,18 +294,17 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
self.inner.estimated_size()
);
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
self.inner.put_value(key, lsn, val, ctx).await
}
pub async fn put_value(
@@ -331,6 +323,7 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
@@ -342,15 +335,11 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: start_key..end_key,
key_range: self.start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
@@ -369,14 +358,15 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
@@ -440,8 +430,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -466,22 +458,11 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 1);
assert_eq!(
layers
.into_iter()
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -518,8 +499,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -548,7 +531,10 @@ mod tests {
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
@@ -567,14 +553,6 @@ mod tests {
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
@@ -622,8 +600,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
)
.await
.unwrap();
@@ -662,35 +642,11 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 2);
let mut layers_iter = layers.into_iter();
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(1)..get_key(2),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -710,8 +666,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -729,20 +687,10 @@ mod tests {
.await
.unwrap();
}
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
assert_eq!(delta_layers.len(), 1);
let delta_layer = delta_layers
.into_iter()
.next()
.unwrap()
.into_resident_layer();
assert_eq!(
delta_layer.layer_desc().key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
is_delta: true
}
);
}
}

View File

@@ -10,6 +10,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
@@ -455,11 +456,9 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
// If compaction period is set to zero (to disable it), then we will use a reasonable default
let period = if period == Duration::ZERO {
humantime::Duration::from_str(
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
)
.unwrap()
.into()
humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
.unwrap()
.into()
} else {
period
};

View File

@@ -66,6 +66,7 @@ use std::{
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
config::defaults::DEFAULT_PITR_INTERVAL,
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
@@ -101,7 +102,6 @@ use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
virtual_file::{MaybeFatalIo, VirtualFile},
};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
@@ -2243,7 +2243,7 @@ impl Timeline {
};
if aux_file_policy == Some(AuxFilePolicy::V1) {
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
warn!("this timeline is using deprecated aux file policy V1");
}
result.repartition_threshold =

View File

@@ -29,6 +29,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
@@ -42,9 +43,6 @@ use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
};
use crate::keyspace::KeySpace;
use crate::repository::{Key, Value};
@@ -911,13 +909,137 @@ impl Timeline {
// we're compacting, in key, LSN order.
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
let mut all_values_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
MergeIterator::create(&deltas, &[], ctx)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -994,7 +1116,7 @@ impl Timeline {
let mut keys = 0;
while let Some((key, lsn, value)) = all_values_iter
.next()
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
@@ -1311,6 +1433,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
@@ -1809,6 +1968,7 @@ impl Timeline {
.unwrap();
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
// as an L0 layer.
let hack_end_key = Key::NON_L0_MAX;
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
let mut downloaded_layers = Vec::new();
@@ -1854,8 +2014,10 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
Key::MIN,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
ctx,
)
.await?;
@@ -1962,7 +2124,7 @@ impl Timeline {
let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
writer
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.await?
} else {
let (layers, _) = writer.take()?;
@@ -1975,7 +2137,7 @@ impl Timeline {
let produced_delta_layers = if !dry_run {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;

View File

@@ -31,7 +31,7 @@ use crate::{
task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
walingest::WalIngest,
walrecord::{decode_wal_record, DecodedWALRecord},
walrecord::DecodedWALRecord,
};
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
@@ -312,25 +312,10 @@ pub(super) async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
@@ -339,28 +324,9 @@ pub(super) async fn handle_walreceiver_connection(
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
// Deserialize WAL record
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?;
if decoded.is_dbase_create_copy(timeline.pg_version)
&& uncommitted_records > 0
{
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
// all earlier writes of data blocks are visible by committing any modification in flight.
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
// Ingest the records without immediately committing them.
let ingested = walingest
.ingest_record(decoded, lsn, &mut modification, &ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
if !ingested {
@@ -383,25 +349,21 @@ pub(super) async fn handle_walreceiver_connection(
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
}
}
// Commit the remaining records.
if uncommitted_records > 0 {
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
}
}

View File

@@ -16,6 +16,7 @@
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use bytes::BytesMut;
use pageserver_api::key::Key;
@@ -28,6 +29,9 @@ use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::{self, VirtualFile};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
/// Metadata bundled with the start and end offset of a blob.
#[derive(Copy, Clone, Debug)]
pub struct BlobMeta {
@@ -593,10 +597,8 @@ impl<'a> VectoredBlobReader<'a> {
}
}
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
///
/// It provides a streaming API for getting read blobs. It returns a batch when
/// `handle` gets called and when the current key would just exceed the read_size and
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,

View File

@@ -1,7 +1,6 @@
//! VirtualFile is like a normal File, but it's not bound directly to
//! a file descriptor.
//!
//! Instead, the file is opened when it's read from,
//! VirtualFile is like a normal File, but it's not bound directly to
//! a file descriptor. Instead, the file is opened when it's read from,
//! and if too many files are open globally in the system, least-recently
//! used ones are closed.
//!
@@ -11,6 +10,7 @@
//! This is similar to PostgreSQL's virtual file descriptor facility in
//! src/backend/storage/file/fd.c
//!
use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use crate::context::RequestContext;
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
@@ -19,7 +19,6 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};

View File

@@ -84,14 +84,9 @@ pub(crate) fn get() -> IoEngine {
}
},
Err(std::env::VarError::NotPresent) => {
#[cfg(target_os = "linux")]
{
IoEngineKind::TokioEpollUring
}
#[cfg(not(target_os = "linux"))]
{
IoEngineKind::StdFs
}
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");

View File

@@ -25,7 +25,9 @@ use std::time::Duration;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::TimestampTz;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
@@ -46,29 +48,13 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
impl CheckPoint {
fn encode(&self) -> Result<Bytes, SerializeError> {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
}
fn update_next_xid(&mut self, xid: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
}
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, {
cp.update_next_multixid(multi_xid, multi_offset)
})
}
}
pub struct WalIngest {
shard: ShardIdentity,
checkpoint: CheckPoint,
@@ -91,13 +77,8 @@ impl WalIngest {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
let pgversion = timeline.pg_version;
let checkpoint = dispatch_pgversion!(pgversion, {
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
});
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
@@ -123,9 +104,10 @@ impl WalIngest {
///
pub async fn ingest_record(
&mut self,
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
WAL_INGEST.records_received.inc();
@@ -133,12 +115,7 @@ impl WalIngest {
let prev_len = modification.len();
modification.set_lsn(lsn)?;
if decoded.is_dbase_create_copy(pg_version) {
// Records of this type should always be preceded by a commit(), as they
// rely on reading data pages back from the Timeline.
assert!(!modification.has_dirty_data_pages());
}
decode_wal_record(recdata, decoded, pg_version)?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -156,11 +133,11 @@ impl WalIngest {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages
// without registering them with the standard mechanism.
self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
.await?;
}
pg_constants::RM_NEON_ID => {
self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
.await?;
}
// Handle other special record types
@@ -348,73 +325,76 @@ impl WalIngest {
}
pg_constants::RM_RELMAP_ID => {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
}
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if cp.nextOid != next_oid {
cp.nextOid = next_oid;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
cp.oldestXid
);
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
cp.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
cp.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
cp.oldestActiveXid = oldest_active_xid;
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
}
});
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -438,11 +418,7 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = xlrec.oldest_running_xid;
});
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
@@ -494,7 +470,7 @@ impl WalIngest {
continue;
}
self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -510,8 +486,6 @@ impl WalIngest {
// until commit() is called to flush the data into the repository and update
// the latest LSN.
modification.on_record_end();
Ok(modification.len() > prev_len)
}
@@ -557,7 +531,7 @@ impl WalIngest {
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
@@ -583,7 +557,6 @@ impl WalIngest {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
.await?;
} else {
@@ -1222,7 +1195,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1244,7 +1217,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no)?;
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1260,17 +1233,12 @@ impl WalIngest {
fn warn_on_ingest_lag(
&mut self,
conf: &crate::config::PageServerConf,
wal_timestamp: TimestampTz,
wal_timestmap: TimestampTz,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
let now = SystemTime::now();
let rate_limits = &mut self.warn_ingest_lag;
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
});
match ts {
match try_from_pg_timestamp(wal_timestmap) {
Ok(ts) => {
match now.duration_since(ts) {
Ok(lag) => {
@@ -1280,7 +1248,7 @@ impl WalIngest {
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
})
}
}
},
Err(e) => {
let delta_t = e.duration();
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
@@ -1294,6 +1262,7 @@ impl WalIngest {
}
}
};
}
Err(error) => {
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
@@ -1401,17 +1370,14 @@ impl WalIngest {
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestXid = xlrec.oldest_xid;
cp.oldestXidDB = xlrec.oldest_xid_db;
});
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
self.checkpoint_modified = true;
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
let latest_page_number =
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
/ pg_constants::CLOG_XACTS_PER_PAGE;
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// Now delete all segments containing pages between xlrec.pageno
// and latest_page_number.
@@ -1419,9 +1385,7 @@ impl WalIngest {
// First, make an important safety check:
// the current endpoint page must not be eligible for removal.
// See SimpleLruTruncate() in slru.c
if dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
}) {
if clogpage_precedes(latest_page_number, xlrec.pageno) {
info!("could not truncate directory pg_xact apparent wraparound");
return Ok(());
}
@@ -1438,12 +1402,7 @@ impl WalIngest {
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
});
if may_delete {
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
@@ -1562,23 +1521,14 @@ impl WalIngest {
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
cp.oldestMultiDB = xlrec.oldest_multi_db;
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
pg_constants::MAX_MULTIXACT_OFFSET,
);
let startsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
(maxsegment, startsegment, endsegment)
});
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
self.checkpoint_modified = true;
// PerformMembersTruncation
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
let mut segment: i32 = startsegment;
// Delete all the segments except the last one. The last segment can still
@@ -1737,7 +1687,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image_zero(rel, gap_blknum)?;
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}
Ok(())
@@ -1803,7 +1753,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
}
}
Ok(())
@@ -1852,23 +1802,11 @@ mod tests {
// TODO
}
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
});
}
Ok(())
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(dispatch_pgversion!(
tline.pg_version,
pgv::ZERO_CHECKPOINT.clone()
))?;
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit(ctx).await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
@@ -1889,25 +1827,21 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -2049,7 +1983,6 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_eq!(
tline
@@ -2075,7 +2008,6 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
.await?;
m.on_record_end();
m.commit(&ctx).await?;
assert_eq!(
tline
@@ -2477,6 +2409,7 @@ mod tests {
.await
.unwrap();
let mut modification = tline.begin_modification(startpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal. We process the wal in chunks because
@@ -2484,10 +2417,8 @@ mod tests {
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
walingest
.ingest_record(decoded, lsn, &mut modification, &ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.instrument(span.clone())
.await
.unwrap();

View File

@@ -160,30 +160,6 @@ pub struct DecodedWALRecord {
pub origin_id: u16,
}
impl DecodedWALRecord {
/// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
/// by reading other existing relations' data blocks. This is more complex to apply than new-style database
/// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
if self.xl_rmid == pg_constants::RM_DBASE_ID {
let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
match pg_version {
14 => {
// Postgres 14 database creations are always the legacy kind
info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
}
15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
_ => {
panic!("Unsupported postgres version {pg_version}")
}
}
} else {
false
}
}
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct RelFileNode {

View File

@@ -43,12 +43,13 @@ use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
/// The real implementation that uses a Postgres process to
/// perform WAL replay.
///
/// Only one thread can use the process at a time, that is controlled by the
/// Mutex. In the future, we might want to launch a pool of processes to allow
/// concurrent replay of multiple records.
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,

View File

@@ -1038,12 +1038,9 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
@@ -1445,17 +1442,12 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*/
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}

69
poetry.lock generated
View File

@@ -985,38 +985,43 @@ files = [
[[package]]
name = "cryptography"
version = "43.0.1"
version = "42.0.4"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-43.0.1-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:8385d98f6a3bf8bb2d65a73e17ed87a3ba84f6991c155691c51112075f9ffc5d"},
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:27e613d7077ac613e399270253259d9d53872aaf657471473ebfc9a52935c062"},
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68aaecc4178e90719e95298515979814bda0cbada1256a4485414860bd7ab962"},
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:de41fd81a41e53267cb020bb3a7212861da53a7d39f863585d13ea11049cf277"},
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f98bf604c82c416bc829e490c700ca1553eafdf2912a91e23a79d97d9801372a"},
{file = "cryptography-43.0.1-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:61ec41068b7b74268fa86e3e9e12b9f0c21fcf65434571dbb13d954bceb08042"},
{file = "cryptography-43.0.1-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:014f58110f53237ace6a408b5beb6c427b64e084eb451ef25a28308270086494"},
{file = "cryptography-43.0.1-cp37-abi3-win32.whl", hash = "sha256:2bd51274dcd59f09dd952afb696bf9c61a7a49dfc764c04dd33ef7a6b502a1e2"},
{file = "cryptography-43.0.1-cp37-abi3-win_amd64.whl", hash = "sha256:666ae11966643886c2987b3b721899d250855718d6d9ce41b521252a17985f4d"},
{file = "cryptography-43.0.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:ac119bb76b9faa00f48128b7f5679e1d8d437365c5d26f1c2c3f0da4ce1b553d"},
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1bbcce1a551e262dfbafb6e6252f1ae36a248e615ca44ba302df077a846a8806"},
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:58d4e9129985185a06d849aa6df265bdd5a74ca6e1b736a77959b498e0505b85"},
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:d03a475165f3134f773d1388aeb19c2d25ba88b6a9733c5c590b9ff7bbfa2e0c"},
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:511f4273808ab590912a93ddb4e3914dfd8a388fed883361b02dea3791f292e1"},
{file = "cryptography-43.0.1-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:80eda8b3e173f0f247f711eef62be51b599b5d425c429b5d4ca6a05e9e856baa"},
{file = "cryptography-43.0.1-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:38926c50cff6f533f8a2dae3d7f19541432610d114a70808f0926d5aaa7121e4"},
{file = "cryptography-43.0.1-cp39-abi3-win32.whl", hash = "sha256:a575913fb06e05e6b4b814d7f7468c2c660e8bb16d8d5a1faf9b33ccc569dd47"},
{file = "cryptography-43.0.1-cp39-abi3-win_amd64.whl", hash = "sha256:d75601ad10b059ec832e78823b348bfa1a59f6b8d545db3a24fd44362a1564cb"},
{file = "cryptography-43.0.1-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:ea25acb556320250756e53f9e20a4177515f012c9eaea17eb7587a8c4d8ae034"},
{file = "cryptography-43.0.1-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:c1332724be35d23a854994ff0b66530119500b6053d0bd3363265f7e5e77288d"},
{file = "cryptography-43.0.1-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:fba1007b3ef89946dbbb515aeeb41e30203b004f0b4b00e5e16078b518563289"},
{file = "cryptography-43.0.1-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:5b43d1ea6b378b54a1dc99dd8a2b5be47658fe9a7ce0a58ff0b55f4b43ef2b84"},
{file = "cryptography-43.0.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:88cce104c36870d70c49c7c8fd22885875d950d9ee6ab54df2745f83ba0dc365"},
{file = "cryptography-43.0.1-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:9d3cdb25fa98afdd3d0892d132b8d7139e2c087da1712041f6b762e4f807cc96"},
{file = "cryptography-43.0.1-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:e710bf40870f4db63c3d7d929aa9e09e4e7ee219e703f949ec4073b4294f6172"},
{file = "cryptography-43.0.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:7c05650fe8023c5ed0d46793d4b7d7e6cd9c04e68eabe5b0aeea836e37bdcec2"},
{file = "cryptography-43.0.1.tar.gz", hash = "sha256:203e92a75716d8cfb491dc47c79e17d0d9207ccffcbcb35f598fbe463ae3444d"},
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:ffc73996c4fca3d2b6c1c8c12bfd3ad00def8621da24f547626bf06441400449"},
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:db4b65b02f59035037fde0998974d84244a64c3265bdef32a827ab9b63d61b18"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dad9c385ba8ee025bb0d856714f71d7840020fe176ae0229de618f14dae7a6e2"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69b22ab6506a3fe483d67d1ed878e1602bdd5912a134e6202c1ec672233241c1"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:e09469a2cec88fb7b078e16d4adec594414397e8879a4341c6ace96013463d5b"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3e970a2119507d0b104f0a8e281521ad28fc26f2820687b3436b8c9a5fcf20d1"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e53dc41cda40b248ebc40b83b31516487f7db95ab8ceac1f042626bc43a2f992"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:c3a5cbc620e1e17009f30dd34cb0d85c987afd21c41a74352d1719be33380885"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bfadd884e7280df24d26f2186e4e07556a05d37393b0f220a840b083dc6a824"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01911714117642a3f1792c7f376db572aadadbafcd8d75bb527166009c9f1d1b"},
{file = "cryptography-42.0.4-cp37-abi3-win32.whl", hash = "sha256:fb0cef872d8193e487fc6bdb08559c3aa41b659a7d9be48b2e10747f47863925"},
{file = "cryptography-42.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c1f25b252d2c87088abc8bbc4f1ecbf7c919e05508a7e8628e6875c40bc70923"},
{file = "cryptography-42.0.4-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:15a1fb843c48b4a604663fa30af60818cd28f895572386e5f9b8a665874c26e7"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1327f280c824ff7885bdeef8578f74690e9079267c1c8bd7dc5cc5aa065ae52"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ffb03d419edcab93b4b19c22ee80c007fb2d708429cecebf1dd3258956a563a"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1df6fcbf60560d2113b5ed90f072dc0b108d64750d4cbd46a21ec882c7aefce9"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:44a64043f743485925d3bcac548d05df0f9bb445c5fcca6681889c7c3ab12764"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:3c6048f217533d89f2f8f4f0fe3044bf0b2090453b7b73d0b77db47b80af8dff"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d0fbe73728c44ca3a241eff9aefe6496ab2656d6e7a4ea2459865f2e8613257"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:887623fe0d70f48ab3f5e4dbf234986b1329a64c066d719432d0698522749929"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:ce8613beaffc7c14f091497346ef117c1798c202b01153a8cc7b8e2ebaaf41c0"},
{file = "cryptography-42.0.4-cp39-abi3-win32.whl", hash = "sha256:810bcf151caefc03e51a3d61e53335cd5c7316c0a105cc695f0959f2c638b129"},
{file = "cryptography-42.0.4-cp39-abi3-win_amd64.whl", hash = "sha256:a0298bdc6e98ca21382afe914c642620370ce0470a01e1bef6dd9b5354c36854"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5f8907fcf57392cd917892ae83708761c6ff3c37a8e835d7246ff0ad251d9298"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:12d341bd42cdb7d4937b0cabbdf2a94f949413ac4504904d0cdbdce4a22cbf88"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1cdcdbd117681c88d717437ada72bdd5be9de117f96e3f4d50dab3f59fd9ab20"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0e89f7b84f421c56e7ff69f11c441ebda73b8a8e6488d322ef71746224c20fce"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:f1e85a178384bf19e36779d91ff35c7617c885da487d689b05c1366f9933ad74"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d2a27aca5597c8a71abbe10209184e1a8e91c1fd470b5070a2ea60cafec35bcd"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4e36685cb634af55e0677d435d425043967ac2f3790ec652b2b88ad03b85c27b"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f47be41843200f7faec0683ad751e5ef11b9a56a220d57f300376cd8aba81660"},
{file = "cryptography-42.0.4.tar.gz", hash = "sha256:831a4b37accef30cccd34fcb916a5d7b5be3cbbe27268a02832c3e450aea39cb"},
]
[package.dependencies]
@@ -1029,7 +1034,7 @@ nox = ["nox"]
pep8test = ["check-sdist", "click", "mypy", "ruff"]
sdist = ["build"]
ssh = ["bcrypt (>=3.1.5)"]
test = ["certifi", "cryptography-vectors (==43.0.1)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
test = ["certifi", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
test-randomorder = ["pytest-randomly"]
[[package]]
@@ -1105,13 +1110,13 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "5.0.0"
version = "4.0.1"
description = "A Flask extension adding a decorator for CORS support"
optional = false
python-versions = "*"
files = [
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
{file = "Flask_Cors-4.0.1-py2.py3-none-any.whl", hash = "sha256:f2a704e4458665580c074b714c4627dd5a306b333deb9074d0b1794dfa2fb677"},
{file = "flask_cors-4.0.1.tar.gz", hash = "sha256:eeb69b342142fdbf4766ad99357a7f3876a2ceb77689dc10ff912aac06c389e4"},
]
[package.dependencies]

View File

@@ -311,9 +311,7 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
@@ -605,7 +603,6 @@ mod tests {
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {

View File

@@ -538,17 +538,4 @@ mod tests {
));
Ok(())
}
#[test]
fn test_connection_blocker() {
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["255.255.255.255"])));
}
}

Some files were not shown because too many files have changed in this diff Show More