Merge branch 'main' into amasterov/regress-arm

This commit is contained in:
a-masterov
2024-09-10 14:29:04 +02:00
committed by GitHub
149 changed files with 2062 additions and 1191 deletions

View File

@@ -7,6 +7,13 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
- AZURE_PROD_CLIENT_ID
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER

56
.github/workflows/_push-to-acr.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: Push images to ACR
on:
workflow_call:
inputs:
client_id:
description: Client ID of Azure managed identity or Entra app
required: true
type: string
image_tag:
description: Tag for the container image
required: true
type: string
images:
description: Images to push
required: true
type: string
registry_name:
description: Name of the container registry
required: true
type: string
subscription_id:
description: Azure subscription ID
required: true
type: string
tenant_id:
description: Azure tenant ID
required: true
type: string
jobs:
push-to-acr:
runs-on: ubuntu-22.04
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
steps:
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ inputs.client_id }}
subscription-id: ${{ inputs.subscription_id }}
tenant-id: ${{ inputs.tenant_id }}
- name: Login to ACR
run: |
az acr login --name=${{ inputs.registry_name }}
- name: Copy docker images to ACR ${{ inputs.registry_name }}
run: |
images='${{ inputs.images }}'
for image in ${images}; do
docker buildx imagetools create \
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
neondatabase/${image}:${{ inputs.image_tag }}
done

View File

@@ -794,9 +794,6 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml down
promote-images:
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
@@ -823,28 +820,6 @@ jobs:
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
- name: Azure login
if: github.ref_name == 'main'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'
run: |
az acr login --name=neoneastus2
- name: Copy docker images to ACR-dev
if: github.ref_name == 'main'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create \
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
done
- name: Add latest tag to images
if: github.ref_name == 'main'
run: |
@@ -882,6 +857,30 @@ jobs:
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
push-to-acr-dev:
if: github.ref_name == 'main'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
push-to-acr-prod:
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, tag ]
runs-on: ubuntu-22.04
@@ -957,8 +956,8 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -7,6 +7,11 @@ on:
pull_request_target:
types:
- opened
workflow_dispatch:
inputs:
github-actor:
description: 'GitHub username. If empty, the username of the current user will be used'
required: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -26,12 +31,31 @@ jobs:
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
ACTOR: ${{ inputs.github-actor || github.actor }}
run: |
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
else
is_member=false
fi
expected_error="User does not exist or is not a member of the organization"
output_file=output.txt
for i in $(seq 1 10); do
if gh api "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${ACTOR}" \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" > ${output_file}; then
is_member=true
break
elif grep -q "${expected_error}" ${output_file}; then
is_member=false
break
elif [ $i -eq 10 ]; then
title="Failed to get memmbership status for ${ACTOR}"
message="The latest GitHub API error message: '$(cat ${output_file})'"
echo "::error file=.github/workflows/label-for-external-users.yml,title=${title}::${message}"
exit 1
fi
sleep 1
done
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}

150
Cargo.lock generated
View File

@@ -915,25 +915,22 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.65.1"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.1",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"itertools 0.12.1",
"log",
"peeking_take_while",
"prettyplease 0.2.6",
"prettyplease 0.2.17",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.52",
"which",
]
[[package]]
@@ -1192,9 +1189,9 @@ dependencies = [
[[package]]
name = "comfy-table"
version = "6.1.4"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d"
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
dependencies = [
"crossterm",
"strum",
@@ -1249,7 +1246,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -1363,8 +1360,8 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util",
"toml 0.7.4",
"toml_edit 0.19.10",
"toml",
"toml_edit",
"tracing",
"url",
"utils",
@@ -1488,25 +1485,22 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crossterm"
version = "0.25.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.1",
"crossterm_winapi",
"libc",
"mio",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
]
[[package]]
name = "crossterm_winapi"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
dependencies = [
"winapi",
]
@@ -2949,12 +2943,6 @@ dependencies = [
"spin 0.5.2",
]
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.150"
@@ -3153,7 +3141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff"
dependencies = [
"serde",
"toml 0.8.14",
"toml",
]
[[package]]
@@ -3669,7 +3657,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"utils",
"workspace_hack",
]
@@ -3756,7 +3744,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"twox-hash",
"url",
@@ -3919,8 +3907,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8"
dependencies = [
"ahash",
"bytes",
@@ -3939,8 +3928,9 @@ dependencies = [
[[package]]
name = "parquet_derive"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86e9fcfae007533a06b580429a3f7e07cb833ec8aa37c041c16563e7918f057e"
dependencies = [
"parquet",
"proc-macro2",
@@ -3977,12 +3967,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "3.0.3"
@@ -4136,7 +4120,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4149,7 +4133,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4168,7 +4152,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4280,9 +4264,9 @@ dependencies = [
[[package]]
name = "prettyplease"
version = "0.2.6"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
dependencies = [
"proc-macro2",
"syn 2.0.52",
@@ -4827,7 +4811,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"utils",
]
@@ -5337,7 +5321,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-subscriber",
"url",
@@ -5746,17 +5730,6 @@ dependencies = [
"signal-hook-registry",
]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@@ -6069,21 +6042,21 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.24.1"
version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
[[package]]
name = "strum_macros"
version = "0.24.3"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck 0.4.1",
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
"syn 1.0.109",
"syn 2.0.52",
]
[[package]]
@@ -6094,8 +6067,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.2"
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
[[package]]
name = "syn"
@@ -6423,7 +6397,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"async-trait",
"byteorder",
@@ -6534,18 +6508,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "toml"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.19.10",
]
[[package]]
name = "toml"
version = "0.8.14"
@@ -6555,7 +6517,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.22.14",
"toml_edit",
]
[[package]]
@@ -6567,19 +6529,6 @@ dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
dependencies = [
"indexmap 1.9.3",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.4.6",
]
[[package]]
name = "toml_edit"
version = "0.22.14"
@@ -6590,7 +6539,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.13",
"winnow",
]
[[package]]
@@ -7003,7 +6952,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-error",
"tracing-subscriber",
@@ -7549,15 +7498,6 @@ version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
[[package]]
name = "winnow"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.13"
@@ -7627,6 +7567,7 @@ dependencies = [
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"libc",
"log",
@@ -7664,6 +7605,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.24.0",
"tokio-util",
"toml_edit",
"tonic",
"tower",
"tracing",

View File

@@ -64,7 +64,7 @@ aws-types = "1.2.0"
axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bindgen = "0.70"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
@@ -73,7 +73,7 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
comfy-table = "6.1"
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
@@ -123,8 +123,8 @@ opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "51.0.0"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.16"
@@ -158,11 +158,10 @@ signal-hook = "0.3"
smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
strum = "0.26"
strum_macros = "0.26"
"subtle" = "2.5.0"
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
svg_fmt = "0.4.3"
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"
@@ -178,8 +177,8 @@ tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.7"
toml_edit = "0.19"
toml = "0.8"
toml_edit = "0.22"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
@@ -202,10 +201,21 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# We want to use the 'neon' branch for these, but there's currently one
# incompatible change on the branch. See:
#
# - PR #8076 which contained changes that depended on the new changes in
# the rust-postgres crate, and
# - PR #8654 which reverted those changes and made the code in proxy incompatible
# with the tip of the 'neon' branch again.
#
# When those proxy changes are re-applied (see PR #8747), we can switch using
# the tip of the 'neon' branch again.
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -242,11 +252,7 @@ tonic-build = "0.9"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# bug fixes for UUID
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
################# Binary contents sections

View File

@@ -87,6 +87,7 @@ RUN mkdir -p /data/.neon/ && \
"pg_distrib_dir='/usr/local/'\n" \
"listen_pg_addr='0.0.0.0:6400'\n" \
"listen_http_addr='0.0.0.0:9898'\n" \
"availability_zone='local'\n" \
> /data/.neon/pageserver.toml && \
chown -R neon:neon /data/.neon

View File

@@ -192,7 +192,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.80.1
ENV RUSTC_VERSION=1.81.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
@@ -207,7 +207,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
export PATH="$HOME/.cargo/bin:$PATH" && \
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools-preview rustfmt clippy && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \

View File

@@ -64,6 +64,12 @@ brew install protobuf openssl flex bison icu4c pkg-config
echo 'export PATH="$(brew --prefix openssl)/bin:$PATH"' >> ~/.zshrc
```
If you get errors about missing `m4` you may have to install it manually:
```
brew install m4
brew link --force m4
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
```
# recommended approach from https://www.rust-lang.org/tools/install

View File

@@ -22,9 +22,10 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal. Wrapping the result
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// Escape a string for including it in a SQL literal.
///
/// Wrapping the result with `E'{}'` or `'{}'` is not required,
/// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
/// for the original implementation.
pub fn escape_literal(s: &str) -> String {

View File

@@ -75,14 +75,14 @@ impl PageServerNode {
}
}
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::Document {
toml_edit::Document::from_str(&format!("id={node_id}")).unwrap()
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::DocumentMut {
toml_edit::DocumentMut::from_str(&format!("id={node_id}")).unwrap()
}
fn pageserver_init_make_toml(
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::Document> {
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)
@@ -137,9 +137,9 @@ impl PageServerNode {
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
let mut config_toml = toml_edit::Document::new();
let mut config_toml = toml_edit::DocumentMut::new();
for fragment_str in overrides {
let fragment = toml_edit::Document::from_str(&fragment_str)
let fragment = toml_edit::DocumentMut::from_str(&fragment_str)
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
for (key, item) in fragment.iter() {
config_toml.insert(key, item.clone());

View File

@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -80,7 +80,10 @@ enum Command {
/// List nodes known to the storage controller
Nodes {},
/// List tenants known to the storage controller
Tenants {},
Tenants {
/// If this field is set, it will list the tenants on a specific node
node_id: Option<NodeId>,
},
/// Create a new tenant in the storage controller, and by extension on pageservers.
TenantCreate {
#[arg(long)]
@@ -336,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id: Some(availability_zone_id),
availability_zone_id,
}),
)
.await?;
@@ -403,7 +406,41 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::Tenants {} => {
Command::Tenants {
node_id: Some(node_id),
} => {
let describe_response = storcon_client
.dispatch::<(), NodeShardResponse>(
Method::GET,
format!("control/v1/node/{node_id}/shards"),
None,
)
.await?;
let shards = describe_response.shards;
let mut table = comfy_table::Table::new();
table.set_header([
"Shard",
"Intended Primary/Secondary",
"Observed Primary/Secondary",
]);
for shard in shards {
table.add_row([
format!("{}", shard.tenant_shard_id),
match shard.is_intended_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
match shard.is_observed_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
]);
}
println!("{table}");
}
Command::Tenants { node_id: None } => {
let mut resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,

View File

@@ -68,6 +68,7 @@ macro_rules! register_uint_gauge {
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
///
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> prometheus::Result<()> {

View File

@@ -104,7 +104,9 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
#[serde(skip_serializing)]
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -209,43 +211,6 @@ pub enum GetImpl {
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
@@ -452,7 +417,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
compact_level0_phase1_value_access: Default::default(),
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -57,7 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub availability_zone_id: Option<String>,
pub availability_zone_id: String,
}
#[derive(Serialize, Deserialize)]
@@ -74,6 +74,17 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsResponse {
pub updated: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -101,6 +112,21 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,
pub shards: Vec<NodeShard>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShard {
pub tenant_shard_id: TenantShardId,
/// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_observed_secondary: Option<bool>,
/// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_intended_secondary: Option<bool>,
}
#[derive(Serialize, Deserialize)]
pub struct NodeDescribeResponse {
pub id: NodeId,
@@ -132,8 +158,12 @@ pub struct TenantDescribeResponseShard {
pub is_splitting: bool,
pub scheduling_policy: ShardSchedulingPolicy,
pub preferred_az_id: Option<String>,
}
/// Migration request for a given tenant shard to a given node.
///
/// Explicitly migrating a particular shard is a low level operation
/// TODO: higher level "Reschedule tenant" operation where the request
/// specifies some constraints, e.g. asking it to get off particular node(s)

View File

@@ -263,15 +263,6 @@ impl Key {
field5: u8::MAX,
field6: u32::MAX,
};
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
pub const NON_L0_MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX - 1,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {

View File

@@ -62,7 +62,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumVariantNames,
strum_macros::VariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
@@ -305,8 +305,10 @@ pub struct TenantConfig {
pub lsn_lease_length_for_ts: Option<String>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// The policy for the aux file storage.
///
/// It can be switched through `switch_aux_file_policy` tenant config.
/// When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
@@ -896,7 +898,9 @@ pub struct WalRedoManagerStatus {
pub process: Option<WalRedoManagerProcessStatus>,
}
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
/// The progress of a secondary tenant.
///
/// It is mostly useful when doing a long running download: e.g. initiating
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
/// what's happening.
#[derive(Default, Debug, Serialize, Deserialize, Clone)]

View File

@@ -69,8 +69,10 @@ impl QueryError {
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
/// or the client closing the connection.
///
/// These errors can happen during normal operations,
/// and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
@@ -79,17 +81,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>>;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -23,7 +23,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -7,6 +7,7 @@ use std::fmt;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
///
/// The `host` part should be a correct `url::Host`, while `port` (if present) should be
/// a valid decimal u16 of digits only.
pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>), anyhow::Error> {

View File

@@ -14,7 +14,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
fn include_file(&self, filename: &str) {
// This does the equivalent of passing bindgen::CargoCallbacks
// to the builder .parse_callbacks() method.
let cargo_callbacks = bindgen::CargoCallbacks;
let cargo_callbacks = bindgen::CargoCallbacks::new();
cargo_callbacks.include_file(filename)
}
@@ -121,6 +121,7 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogPageHeaderData")
.allowlist_type("XLogLongPageHeaderData")
.allowlist_var("XLOG_PAGE_MAGIC")
.allowlist_var("PG_MAJORVERSION_NUM")
.allowlist_var("PG_CONTROL_FILE_SIZE")
.allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.allowlist_type("PageHeaderData")

View File

@@ -44,6 +44,9 @@ macro_rules! postgres_ffi {
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
}
};
}
@@ -106,6 +109,107 @@ macro_rules! dispatch_pgversion {
};
}
#[macro_export]
macro_rules! enum_pgversion_dispatch {
($name:expr, $typ:ident, $bind:ident, $code:block) => {
enum_pgversion_dispatch!(
name = $name,
bind = $bind,
typ = $typ,
code = $code,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
)
};
(name = $name:expr,
bind = $bind:ident,
typ = $typ:ident,
code = $code:block,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
match $name {
$(
self::$typ::$variant($bind) => {
use $crate::$md as pgv;
$code
}
),+,
}
};
}
#[macro_export]
macro_rules! enum_pgversion {
{$name:ident, pgv :: $t:ident} => {
enum_pgversion!{
name = $name,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{$name:ident, pgv :: $p:ident :: $t:ident} => {
enum_pgversion!{
name = $name,
path = $p,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{name = $name:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ( $crate::$md::$t )),+
}
impl self::$name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<self::$name> for $crate::$md::$t {
fn into(self) -> self::$name {
self::$name::$variant (self)
}
}
)+
};
{name = $name:ident,
path = $p:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ($crate::$md::$p::$t)),+
}
impl $name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<$name> for $crate::$md::$p::$t {
fn into(self) -> $name {
$name::$variant (self)
}
}
)+
};
}
pub mod pg_constants;
pub mod relfile_utils;

View File

@@ -185,7 +185,7 @@ mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::Document>().unwrap();
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}

View File

@@ -45,6 +45,8 @@ pub use azure_core::Etag;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Default concurrency limit for S3 operations
///
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -300,7 +302,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<(), TimeTravelError>;
}
/// DownloadStream is sensitive to the timeout and cancellation used with the original
/// Data part of an ongoing [`Download`].
///
/// `DownloadStream` is sensitive to the timeout and cancellation used with the original
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
/// with `tokio::io::copy_buf`.
// This has 'static because safekeepers do not use cancellation tokens (yet)

View File

@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}

View File

@@ -5,9 +5,10 @@
mod calculation;
pub mod svg;
/// StorageModel is the input to the synthetic size calculation. It represents
/// a tree of timelines, with just the information that's needed for the
/// calculation. This doesn't track timeline names or where each timeline
/// StorageModel is the input to the synthetic size calculation.
///
/// It represents a tree of timelines, with just the information that's needed
/// for the calculation. This doesn't track timeline names or where each timeline
/// begins and ends, for example. Instead, it consists of "points of interest"
/// on the timelines. A point of interest could be the timeline start or end point,
/// the oldest point on a timeline that needs to be retained because of PITR

View File

@@ -5,8 +5,10 @@ use std::{
use metrics::IntCounter;
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// Circuit breakers are for operations that are expensive and fallible.
///
/// If a circuit breaker fails repeatedly, we will stop attempting it for some
/// period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// An identifier that enables us to log useful errors when a circuit is broken

View File

@@ -1,3 +1,4 @@
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
@@ -203,6 +204,27 @@ pub fn overwrite(
Ok(())
}
/// Syncs the filesystem for the given file descriptor.
#[cfg_attr(target_os = "macos", allow(unused_variables))]
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use anyhow::Context;
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -249,8 +249,10 @@ macro_rules! id_newtype {
};
}
/// Neon timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// Neon timeline ID.
///
/// They are different from PostgreSQL timeline
/// IDs, but serve a similar purpose: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given

View File

@@ -100,7 +100,9 @@ pub enum LockFileRead {
}
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
/// inspect its content.
///
/// It is not an `Err(...)` if the file does not exist or is already locked.
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);

View File

@@ -3,11 +3,9 @@ use std::str::FromStr;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
use strum_macros::{EnumString, VariantNames};
#[derive(
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
)]
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
@@ -190,7 +188,7 @@ impl Drop for TracingPanicHookGuard {
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();

View File

@@ -8,6 +8,7 @@ use tracing::{trace, warn};
use crate::lsn::Lsn;
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.

View File

@@ -65,6 +65,8 @@ impl<T> Poison<T> {
}
}
/// Armed pointer to a [`Poison`].
///
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned

View File

@@ -13,10 +13,11 @@ pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
/// Combination of ShardNumber and ShardCount.
///
/// For use within the context of a particular tenant, when we need to know which shard we're
/// dealing with, but do not need to know the full ShardIdentity (because we won't be doing
/// any page->shard mapping), and do not need to know the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,

View File

@@ -49,12 +49,11 @@ use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
///
/// Rcu allows multiple readers to read and hold onto a value without blocking
/// (for very long). Storing to the Rcu updates the value, making new readers
/// immediately see the new value, but it also waits for all current readers to
/// finish.
/// (for very long).
///
/// Storing to the Rcu updates the value, making new readers immediately see
/// the new value, but it also waits for all current readers to finish.
pub struct Rcu<V> {
inner: RwLock<RcuInner<V>>,
}

View File

@@ -5,7 +5,9 @@ use std::sync::{
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// `SemaphorePermit`.
///
/// Allows use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].

View File

@@ -10,7 +10,7 @@ pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::Document = match item {
let document: toml_edit::DocumentMut = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()

View File

@@ -7,6 +7,7 @@ pub enum VecMapOrdering {
}
/// Ordered map datastructure implemented in a Vec.
///
/// Append only - can only add keys that are larger than the
/// current max key.
/// Ordering can be adjusted using [`VecMapOrdering`]

View File

@@ -6,9 +6,10 @@ pub enum YieldingLoopError {
Cancelled,
}
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
/// yields to avoid blocking the executor, and after resuming checks the provided
/// cancellation token to drop out promptly on shutdown.
/// Helper for long synchronous loops, e.g. over all tenants in the system.
///
/// Periodically yields to avoid blocking the executor, and after resuming
/// checks the provided cancellation token to drop out promptly on shutdown.
#[inline(always)]
pub async fn yielding_loop<I, T, F>(
interval: usize,
@@ -23,7 +24,7 @@ where
for (i, item) in iter.enumerate() {
visitor(item);
if i + 1 % interval == 0 {
if (i + 1) % interval == 0 {
tokio::task::yield_now().await;
if cancel.is_cancelled() {
return Err(YieldingLoopError::Cancelled);

View File

@@ -4,7 +4,6 @@
use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};
use bindgen::CargoCallbacks;
fn main() -> anyhow::Result<()> {
// Tell cargo to invalidate the built crate whenever the wrapper changes
@@ -64,16 +63,25 @@ fn main() -> anyhow::Result<()> {
.map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
};
let unwind_abi_functions = [
"log_internal",
"recovery_download",
"start_streaming",
"finish_sync_safekeepers",
"wait_event_set",
"WalProposerStart",
];
// The bindgen::Builder is the main entry point
// to bindgen, and lets you build up options for
// the resulting bindings.
let bindings = bindgen::Builder::default()
let mut builder = bindgen::Builder::default()
// The input header we would like to generate
// bindings for.
.header("bindgen_deps.h")
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.
.parse_callbacks(Box::new(CargoCallbacks))
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.allowlist_type("WalProposer")
.allowlist_type("WalProposerConfig")
.allowlist_type("walproposer_api")
@@ -105,7 +113,12 @@ fn main() -> anyhow::Result<()> {
.allowlist_var("WL_SOCKET_MASK")
.clang_arg("-DWALPROPOSER_LIB")
.clang_arg(format!("-I{pgxn_neon}"))
.clang_arg(format!("-I{inc_server_path}"))
.clang_arg(format!("-I{inc_server_path}"));
for name in unwind_abi_functions {
builder = builder.override_abi(bindgen::Abi::CUnwind, name);
}
let bindings = builder
// Finish the builder and generate the bindings.
.generate()
// Unwrap the Result and panic on failure.

View File

@@ -33,7 +33,7 @@ extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemStat
}
}
extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
extern "C-unwind" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -187,7 +187,7 @@ extern "C" fn conn_blocking_write(
}
}
extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
extern "C-unwind" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -272,7 +272,7 @@ extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
}
}
extern "C" fn wait_event_set(
extern "C-unwind" fn wait_event_set(
wp: *mut WalProposer,
timeout: ::std::os::raw::c_long,
event_sk: *mut *mut Safekeeper,
@@ -324,7 +324,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
}
}
extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -340,7 +340,7 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, sk: *mut Safekee
}
}
extern "C" fn log_internal(
extern "C-unwind" fn log_internal(
wp: *mut WalProposer,
level: ::std::os::raw::c_int,
line: *const ::std::os::raw::c_char,

View File

@@ -1,2 +1,20 @@
pub mod mgmt_api;
pub mod page_service;
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
// If file structure is per-kind not per-feature then where to put this?
#[derive(Clone, Copy)]
pub enum BlockUnblock {
Block,
Unblock,
}
impl std::fmt::Display for BlockUnblock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
BlockUnblock::Block => "block",
BlockUnblock::Unblock => "unblock",
};
f.write_str(s)
}
}

View File

@@ -12,6 +12,8 @@ use utils::{
pub use reqwest::Body as ReqwestBody;
use crate::BlockUnblock;
pub mod util;
#[derive(Debug, Clone)]
@@ -454,6 +456,20 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
self.mgmt_api_endpoint,
);
self.request(Method::POST, &uri, ()).await.map(|_| ())
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> {
println!("specified prefix '{}' failed validation", cmd.prefix);
return Ok(());
};
let toml_document = toml_edit::Document::from_str(&cmd.config_toml_str)?;
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");

View File

@@ -37,6 +37,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::crashsafe::syncfs;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::{
@@ -125,7 +126,6 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -156,23 +156,7 @@ fn main() -> anyhow::Result<()> {
};
let started = Instant::now();
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use std::os::fd::AsRawFd;
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
drop(dirfd);
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
syncfs(dirfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),

View File

@@ -174,16 +174,14 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: pageserver_api::config::CompactL0Phase1ValueAccess,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
/// Token for authentication to safekeepers
///
/// We do not want to store this in a PageServerConf because the latter may be logged
/// and/or serialized at a whim, while the token is secret. Currently this token is the
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
@@ -338,7 +336,7 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access,
compact_level0_phase1_value_access: _,
l0_flush,
virtual_file_direct_io,
concurrent_tenant_warmup,
@@ -383,7 +381,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access,
virtual_file_direct_io,
io_buffer_alignment,
@@ -561,6 +558,16 @@ mod tests {
.expect("parse_and_validate");
}
#[test]
fn test_compactl0_phase1_access_mode_is_ignored_silently() {
let input = indoc::indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
validate = "key-lsn-value"
"#};
toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input).unwrap();
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.
@@ -637,14 +644,5 @@ mod tests {
// some_invalid_field = 23
// "#}
// );
test!(
compact_level0_phase1_value_access,
indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
some_invalid_field = 23
"#}
);
}
}

View File

@@ -1,7 +1,9 @@
//! This module defines `RequestContext`, a structure that we use throughout
//! the pageserver to propagate high-level context from places
//! that _originate_ activity down to the shared code paths at the
//! heart of the pageserver. It's inspired by Golang's `context.Context`.
//! Defines [`RequestContext`].
//!
//! It is a structure that we use throughout the pageserver to propagate
//! high-level context from places that _originate_ activity down to the
//! shared code paths at the heart of the pageserver. It's inspired by
//! Golang's `context.Context`.
//!
//! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
//! 1. What high-level activity ([`TaskKind`]) needs this page?

View File

@@ -141,10 +141,24 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
m.other
);
let az_id = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
let az_id = {
let az_id_from_metadata = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
match az_id_from_metadata {
Some(az_id) => Some(az_id),
None => {
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
conf.availability_zone.clone()
}
}
};
if az_id.is_none() {
panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
}
Some(NodeRegisterRequest {
node_id: conf.id,
@@ -152,7 +166,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
availability_zone_id: az_id,
availability_zone_id: az_id.expect("Checked above"),
})
}
Err(e) => {

View File

@@ -9,7 +9,7 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use strum_macros::{IntoStaticStr, VariantNames};
use tracing::warn;
use utils::id::TimelineId;
@@ -27,7 +27,7 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
];
// Metrics collected on operations on the storage repository.
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
#[derive(Debug, VariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum StorageTimeOperation {
#[strum(serialize = "layer flush")]

View File

@@ -1199,7 +1199,6 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

View File

@@ -1021,9 +1021,10 @@ impl Timeline {
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
/// updates to the repository.
///
/// It is created by the 'begin_record' function. It is called for each WAL
/// record, so that all the modifications by a one WAL record appear atomic.
pub struct DatadirModification<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
@@ -1204,6 +1205,13 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -1215,14 +1223,34 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(key, Value::Image(img));
Ok(())
}
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
pub(crate) fn put_rel_page_image_zero(
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
pub(crate) fn put_slru_page_image_zero(
@@ -1230,10 +1258,18 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
/// Call this at the end of each WAL record.
@@ -2048,6 +2084,7 @@ impl<'a> DatadirModification<'a> {
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the

View File

@@ -1,8 +1,9 @@
//! Timeline repository implementation that keeps old data in layer files, and
//! the recent changes in ephemeral files.
//!
//! Timeline repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See tenant/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//! See tenant/*_layer.rs files. The functions here are responsible for locating
//! the correct layer for the get/put call, walking back the timeline branching
//! history as needed.
//!
//! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
//! directory. See docs/pageserver-storage.md for how the files are managed.
@@ -7090,13 +7091,13 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: Key::MIN..Key::MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
// The delta layer below the horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: get_key(3)..get_key(4),
lsn_range: Lsn(0x30)..Lsn(0x48),
is_delta: true
},

View File

@@ -452,7 +452,8 @@ impl TryFrom<toml_edit::Item> for TenantConfOpt {
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}
toml_edit::Item::Table(table) => {
let deserializer = toml_edit::de::Deserializer::new(table.into());
let deserializer =
toml_edit::de::Deserializer::from(toml_edit::DocumentMut::from(table));
return serde_path_to_error::deserialize(deserializer)
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}

View File

@@ -1,7 +1,8 @@
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
//! this struct and it's original serialization format is still needed because they were written a
//! long time ago.
//! Describes the legacy now hopefully no longer modified per-timeline metadata.
//!
//! It is stored in `index_part.json` managed by [`remote_timeline_client`]. For many tenants and
//! their timelines, this struct and its original serialization format is still needed because
//! they were written a long time ago.
//!
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
//! versioning.

View File

@@ -282,9 +282,10 @@ impl BackgroundPurges {
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
@@ -2346,8 +2347,9 @@ pub enum TenantMapError {
ShuttingDown,
}
/// Guards a particular tenant_id's content in the TenantsMap. While this
/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// Guards a particular tenant_id's content in the TenantsMap.
///
/// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// for this tenant, which acts as a marker for any operations targeting
/// this tenant to retry later, or wait for the InProgress state to end.
///

View File

@@ -2184,6 +2184,8 @@ pub fn remote_timeline_path(
remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
}
/// Obtains the path of the given Layer in the remote
///
/// Note that the shard component of a remote layer path is _not_ always the same
/// as in the TenantShardId of the caller: tenants may reference layers from a different
/// ShardIndex. Use the ShardIndex from the layer's metadata.

View File

@@ -548,7 +548,7 @@ pub(crate) async fn download_initdb_tar_zst(
cancel,
)
.await
.map_err(|e| {
.inspect_err(|_e| {
// Do a best-effort attempt at deleting the temporary file upon encountering an error.
// We don't have async here nor do we want to pile on any extra errors.
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -556,7 +556,6 @@ pub(crate) async fn download_initdb_tar_zst(
warn!("error deleting temporary file {temp_path}: {e}");
}
}
e
})?;
Ok((temp_path, file))

View File

@@ -1,4 +1,5 @@
//! In-memory index to track the tenant files on the remote storage.
//!
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.

View File

@@ -434,10 +434,11 @@ impl ReadableLayer {
}
}
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
/// Layers contain a hint indicating whether they are likely to be used for reads.
///
/// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
/// when changing the visibility of layers (for example when creating a branch that makes some previously
/// covered layers visible). It should be used for cache management but not for correctness-critical checks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it

View File

@@ -136,10 +136,11 @@ impl Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
/// Struct representing reference to BLOB in layers.
///
/// Reference contains BLOB offset, and for WAL records it also contains
/// `will_init` flag. The flag helps to determine the range of records
/// that needs to be applied, without reading/deserializing records themselves.
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);

View File

@@ -1,7 +1,9 @@
//! An ImageLayer represents an image or a snapshot of a key-range at
//! one particular LSN. It contains an image of all key-value pairs
//! in its key-range. Any key that falls into the image layer's range
//! but does not exist in the layer, does not exist.
//! one particular LSN.
//!
//! It contains an image of all key-value pairs in its key-range. Any key
//! that falls into the image layer's range but does not exist in the layer,
//! does not exist.
//!
//! An image layer is stored in a file on disk. The file is stored in
//! timelines/<timeline_id> directory. Currently, there are no

View File

@@ -12,8 +12,10 @@ use serde::{Deserialize, Serialize};
#[cfg(test)]
use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// A unique identifier of a persistent layer.
///
/// This is different from `LayerDescriptor`, which is only used in the benchmarks.
/// This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {

View File

@@ -217,8 +217,9 @@ impl fmt::Display for ImageLayerName {
}
}
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time. The
/// LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time.
///
/// The LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// over time (e.g. across shard splits or compression). The physical filenames of layers in local
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])

View File

@@ -226,9 +226,11 @@ impl<'a> IteratorWrapper<'a> {
}
}
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
/// A merge iterator over delta/image layer iterators.
///
/// When duplicated records are found, the iterator will not perform any
/// deduplication, and the caller should handle these situation. By saying
/// duplicated records, there are many possibilities:
///
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.

View File

@@ -34,9 +34,10 @@ impl SplitWriterResult {
}
}
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
@@ -187,22 +188,23 @@ impl SplitImageLayerWriter {
.await
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
/// to be cleaned up).
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: DeltaLayerWriter,
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
@@ -210,7 +212,6 @@ pub struct SplitDeltaLayerWriter {
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
start_key: Key,
}
impl SplitDeltaLayerWriter {
@@ -218,29 +219,18 @@ impl SplitDeltaLayerWriter {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
inner: None,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
start_key,
})
}
@@ -263,9 +253,26 @@ impl SplitDeltaLayerWriter {
//
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
// strategy. https://github.com/neondatabase/neon/issues/8837
if self.inner.is_none() {
self.inner = Some((
key,
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
if key != self.last_key_written {
let next_delta_writer = DeltaLayerWriter::new(
@@ -277,13 +284,13 @@ impl SplitDeltaLayerWriter {
ctx,
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.start_key = key;
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
@@ -294,17 +301,18 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
self.inner.estimated_size()
inner.estimated_size()
);
}
}
self.last_key_written = key;
self.inner.put_value(key, lsn, val, ctx).await
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
}
pub async fn put_value(
@@ -323,7 +331,6 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
@@ -335,11 +342,15 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
@@ -358,15 +369,14 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
}
}
@@ -430,10 +440,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -458,11 +466,22 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 1);
assert_eq!(
layers
.into_iter()
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -499,10 +518,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -531,10 +548,7 @@ mod tests {
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
@@ -553,6 +567,14 @@ mod tests {
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
@@ -600,10 +622,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
)
.await
.unwrap();
@@ -642,11 +662,35 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 2);
let mut layers_iter = layers.into_iter();
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(1)..get_key(2),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -666,10 +710,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -687,10 +729,20 @@ mod tests {
.await
.unwrap();
}
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(delta_layers.len(), 1);
let delta_layer = delta_layers
.into_iter()
.next()
.unwrap()
.into_resident_layer();
assert_eq!(
delta_layer.layer_desc().key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
is_delta: true
}
);
}
}

View File

@@ -19,7 +19,6 @@ use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::config::{CompactL0BypassPageCacheValidation, CompactL0Phase1ValueAccess};
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
@@ -912,137 +911,13 @@ impl Timeline {
// we're compacting, in key, LSN order.
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
let mut all_values_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -1119,7 +994,7 @@ impl Timeline {
let mut keys = 0;
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.next()
.await
.map_err(CompactionError::Other)?
{
@@ -1934,7 +1809,6 @@ impl Timeline {
.unwrap();
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
// as an L0 layer.
let hack_end_key = Key::NON_L0_MAX;
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
let mut downloaded_layers = Vec::new();
@@ -1980,10 +1854,8 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
Key::MIN,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
ctx,
)
.await?;
@@ -2090,7 +1962,7 @@ impl Timeline {
let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
let (layers, _) = writer.take()?;
@@ -2103,7 +1975,7 @@ impl Timeline {
let produced_delta_layers = if !dry_run {
delta_layer_writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;

View File

@@ -593,8 +593,10 @@ impl<'a> VectoredBlobReader<'a> {
}
}
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
///
/// It provides a streaming API for getting read blobs. It returns a batch when
/// `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,

View File

@@ -1,6 +1,7 @@
//!
//! VirtualFile is like a normal File, but it's not bound directly to
//! a file descriptor. Instead, the file is opened when it's read from,
//! a file descriptor.
//!
//! Instead, the file is opened when it's read from,
//! and if too many files are open globally in the system, least-recently
//! used ones are closed.
//!

View File

@@ -25,9 +25,7 @@ use std::time::Duration;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::TimestampTz;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
@@ -48,16 +46,31 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
impl CheckPoint {
fn encode(&self) -> Result<Bytes, SerializeError> {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
}
fn update_next_xid(&mut self, xid: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
}
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, {
cp.update_next_multixid(multi_xid, multi_offset)
})
}
}
pub struct WalIngest {
shard: ShardIdentity,
pg_version: u32,
checkpoint: CheckPoint,
checkpoint_modified: bool,
warn_ingest_lag: WarnIngestLag,
@@ -78,12 +91,16 @@ impl WalIngest {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
let pgversion = timeline.pg_version;
let checkpoint = dispatch_pgversion!(pgversion, {
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
});
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
pg_version: timeline.pg_version,
checkpoint,
checkpoint_modified: false,
warn_ingest_lag: WarnIngestLag {
@@ -117,7 +134,7 @@ impl WalIngest {
modification.set_lsn(lsn)?;
if decoded.is_dbase_create_copy(self.pg_version) {
if decoded.is_dbase_create_copy(pg_version) {
// Records of this type should always be preceded by a commit(), as they
// rely on reading data pages back from the Timeline.
assert!(!modification.has_dirty_data_pages());
@@ -337,70 +354,67 @@ impl WalIngest {
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if cp.nextOid != next_oid {
cp.nextOid = next_oid;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
cp.oldestXid
);
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
cp.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
cp.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
cp.oldestActiveXid = oldest_active_xid;
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
});
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -424,7 +438,11 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = xlrec.oldest_running_xid;
});
self.checkpoint_modified = true;
}
}
@@ -539,7 +557,7 @@ impl WalIngest {
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
@@ -1204,7 +1222,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1226,7 +1244,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no);
modification.put_rel_page_image_zero(rel, vm_page_no)?;
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1242,12 +1260,17 @@ impl WalIngest {
fn warn_on_ingest_lag(
&mut self,
conf: &crate::config::PageServerConf,
wal_timestmap: TimestampTz,
wal_timestamp: TimestampTz,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
let now = SystemTime::now();
let rate_limits = &mut self.warn_ingest_lag;
match try_from_pg_timestamp(wal_timestmap) {
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
});
match ts {
Ok(ts) => {
match now.duration_since(ts) {
Ok(lag) => {
@@ -1257,7 +1280,7 @@ impl WalIngest {
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
})
}
},
}
Err(e) => {
let delta_t = e.duration();
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
@@ -1271,7 +1294,6 @@ impl WalIngest {
}
}
};
}
Err(error) => {
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
@@ -1379,14 +1401,17 @@ impl WalIngest {
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestXid = xlrec.oldest_xid;
cp.oldestXidDB = xlrec.oldest_xid_db;
});
self.checkpoint_modified = true;
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
let latest_page_number =
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
/ pg_constants::CLOG_XACTS_PER_PAGE;
// Now delete all segments containing pages between xlrec.pageno
// and latest_page_number.
@@ -1394,7 +1419,9 @@ impl WalIngest {
// First, make an important safety check:
// the current endpoint page must not be eligible for removal.
// See SimpleLruTruncate() in slru.c
if clogpage_precedes(latest_page_number, xlrec.pageno) {
if dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
}) {
info!("could not truncate directory pg_xact apparent wraparound");
return Ok(());
}
@@ -1411,7 +1438,12 @@ impl WalIngest {
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
});
if may_delete {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
@@ -1530,14 +1562,23 @@ impl WalIngest {
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
cp.oldestMultiDB = xlrec.oldest_multi_db;
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
pg_constants::MAX_MULTIXACT_OFFSET,
);
let startsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
(maxsegment, startsegment, endsegment)
});
self.checkpoint_modified = true;
// PerformMembersTruncation
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
let mut segment: i32 = startsegment;
// Delete all the segments except the last one. The last segment can still
@@ -1696,7 +1737,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image_zero(rel, gap_blknum);
modification.put_rel_page_image_zero(rel, gap_blknum)?;
}
}
Ok(())
@@ -1762,7 +1803,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
}
}
Ok(())
@@ -1811,11 +1852,23 @@ mod tests {
// TODO
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
});
}
Ok(())
}
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_checkpoint(dispatch_pgversion!(
tline.pg_version,
pgv::ZERO_CHECKPOINT.clone()
))?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit(ctx).await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;

View File

@@ -43,13 +43,12 @@ use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
/// The real implementation that uses a Postgres process to
/// perform WAL replay.
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
/// Only one thread can use the process at a time, that is controlled by the
/// Mutex. In the future, we might want to launch a pool of processes to allow
/// concurrent replay of multiple records.
pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,

View File

@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}

View File

@@ -16,7 +16,7 @@ use tracing::debug;
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
use super::{common::Cached, *};
use super::{common::Cached, timed_lru, Cache};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:

View File

@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -38,10 +38,7 @@ impl Api {
locks: &'static ApiLocks<EndpointCacheKey>,
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
) -> Self {
let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
Err(_) => String::new(),
};
let jwt = std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN").unwrap_or_default();
Self {
endpoint,
caches,

View File

@@ -598,15 +598,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
]
);
@@ -638,11 +638,11 @@ mod tests {
assert_eq!(
file_stats,
[
(1208861, 5, 10000),
(1208592, 5, 10000),
(1208885, 5, 10000),
(1208873, 5, 10000),
(1209128, 5, 10000)
(1203465, 5, 10000),
(1203189, 5, 10000),
(1203490, 5, 10000),
(1203475, 5, 10000),
(1203729, 5, 10000)
]
);
@@ -667,15 +667,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
]
);
@@ -712,7 +712,7 @@ mod tests {
// files are smaller than the size threshold, but they took too long to fill so were flushed early
assert_eq!(
file_stats,
[(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
[(657696, 2, 3001), (657410, 2, 3000), (657206, 2, 2999)]
);
tmpdir.close().unwrap();

View File

@@ -44,16 +44,14 @@
clippy::items_after_statements,
)]
// List of temporarily allowed lints.
// TODO: Switch to except() once stable with 1.81.
// TODO: fix code and reduce list or move to permanent list above.
#![allow(
#![expect(
clippy::cargo_common_metadata,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::doc_markdown,
clippy::implicit_hasher,
clippy::inline_always,
clippy::match_same_arms,
clippy::match_wild_err_arm,
@@ -61,21 +59,28 @@
clippy::missing_panics_doc,
clippy::module_name_repetitions,
clippy::needless_pass_by_value,
clippy::needless_raw_string_hashes,
clippy::redundant_closure_for_method_calls,
clippy::return_self_not_must_use,
clippy::similar_names,
clippy::single_match_else,
clippy::struct_excessive_bools,
clippy::struct_field_names,
clippy::too_many_lines,
clippy::unreadable_literal,
clippy::unused_async,
clippy::unused_self,
clippy::wildcard_imports
clippy::unused_self
)]
#![cfg_attr(
any(test, feature = "testing"),
allow(
clippy::needless_raw_string_hashes,
clippy::unreadable_literal,
clippy::unused_async,
)
)]
// List of temporarily allowed lints to unblock beta/nightly.
#![allow(unknown_lints, clippy::manual_inspect)]
#![allow(
unknown_lints,
// TODO: 1.82: Add `use<T>` where necessary and remove from this list.
impl_trait_overcaptures,
)]
use std::{convert::Infallible, future::Future};

View File

@@ -217,6 +217,7 @@ impl sasl::Mechanism for Exchange<'_> {
self.state = ExchangeState::SaltSent(sent);
Ok(Step::Continue(self, msg))
}
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Success(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}
@@ -224,6 +225,7 @@ impl sasl::Mechanism for Exchange<'_> {
ExchangeState::SaltSent(sent) => {
match sent.transition(self.secret, &self.tls_server_end_point, input)? {
Step::Success(keys, msg) => Ok(Step::Success(keys, msg)),
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Continue(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}

View File

@@ -745,22 +745,20 @@ impl BatchQueryData {
builder = builder.deferrable(true);
}
let transaction = builder.start().await.map_err(|e| {
let transaction = builder.start().await.inspect_err(|_| {
// if we cannot start a transaction, we should return immediately
// and not return to the pool. connection is clearly broken
discard.discard();
e
})?;
let json_output =
match query_batch(cancel.child_token(), &transaction, self, parsed_headers).await {
Ok(json_output) => {
info!("commit");
let status = transaction.commit().await.map_err(|e| {
let status = transaction.commit().await.inspect_err(|_| {
// if we cannot commit - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
json_output
@@ -776,11 +774,10 @@ impl BatchQueryData {
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
let status = transaction.rollback().await.inspect_err(|_| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
return Err(err);

View File

@@ -14,6 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
/// Stream wrapper which implements libpq's protocol.
///
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
/// to pass random malformed bytes through the connection).

View File

@@ -1,7 +1,7 @@
[toolchain]
channel = "1.80.1"
channel = "1.81.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html
# but we also need `llvm-tools-preview` for coverage data merges on CI
components = ["llvm-tools-preview", "rustfmt", "clippy"]
# but we also need `llvm-tools` for coverage data merges on CI
components = ["llvm-tools", "rustfmt", "clippy"]

View File

@@ -1,6 +1,9 @@
use utils::auth::{AuthError, Claims, Scope};
use utils::id::TenantId;
/// If tenant_id is provided, allow if token (claims) is for this tenant or
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
/// SafekeeperData.
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(

View File

@@ -19,7 +19,7 @@ use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage_broker::Uri;
use tracing::*;
@@ -261,6 +261,15 @@ async fn main() -> anyhow::Result<()> {
// Change into the data directory.
std::env::set_current_dir(&workdir)?;
// Prevent running multiple safekeepers on the same directory
let lock_file_path = workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// Set or read our ID.
let id = set_id(&workdir, args.id.map(NodeId))?;
if args.init {
@@ -364,15 +373,15 @@ async fn main() -> anyhow::Result<()> {
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// fsync the datadir to make sure we have a consistent state on disk.
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
let started = Instant::now();
utils::crashsafe::syncfs(dfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),
"syncfs data directory done"
);
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {

View File

@@ -2,6 +2,7 @@
//! protocol commands.
use anyhow::Context;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -95,7 +96,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{
@@ -197,49 +197,51 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
Ok(())
}
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
) -> impl Future<Output = Result<(), QueryError>> {
Box::pin(async move {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
})
}
}

View File

@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
use utils::{
auth::SwappableJwtAuth,
http::{
@@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
json_response(StatusCode::OK, response)
}
/// Make term at least as high as one in request. If one in request is None,
/// increment current one.
async fn timeline_term_bump_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|r| request_span(r, timeline_backup_partial_reset),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|r| request_span(r, timeline_term_bump_handler),
)
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})

View File

@@ -484,6 +484,7 @@ pub async fn validate_temp_timeline(
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
///
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.

View File

@@ -448,8 +448,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
/// replies to reply_tx.
///
/// Reading from socket and writing to disk in parallel is beneficial for
/// performance, this struct provides the writing to disk part.
pub struct WalAcceptor {
tli: WalResidentTimeline,
msg_rx: Receiver<ProposerAcceptorMessage>,

View File

@@ -938,8 +938,9 @@ where
}
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
"processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
msg.wal_data.len(),
msg.h.begin_lsn,
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,

View File

@@ -758,9 +758,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
// pq_sendint32(&reply_message, xmin);
// pq_sendint32(&reply_message, xmin_epoch);
// So it is two big endian 32-bit words in low endian order!
hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32);
hs_feedback.catalog_xmin =
(hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32);
hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
self.ws_guard
.walsenders
.record_hs_feedback(self.ws_guard.id, &hs_feedback);

View File

@@ -1,9 +1,10 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).
use std::ops::Deref;
use std::{cmp::max, ops::Deref};
use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -12,7 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};
@@ -147,9 +148,11 @@ pub struct TimelineMemState {
pub proposer_uuid: PgUuid,
}
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
/// when we update fields like commit_lsn which don't need immediate
/// persistence. Provides transactional like API to atomically update the state.
/// Safekeeper persistent state plus in memory layer.
///
/// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
/// which don't need immediate persistence. Provides transactional like API
/// to atomically update the state.
///
/// Implements Deref into *persistent* part.
pub struct TimelineState<CTRL: control_file::Storage> {
@@ -209,6 +212,27 @@ where
let s = self.start_change();
self.finish_change(&s).await
}
/// Make term at least as `to`. If `to` is None, increment current one. This
/// is not in safekeeper.rs because we want to be able to do it even if
/// timeline is offloaded.
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let before = self.acceptor_state.term;
let mut state = self.start_change();
let new = match to {
Some(to) => max(state.acceptor_state.term, to),
None => state.acceptor_state.term + 1,
};
if new > state.acceptor_state.term {
state.acceptor_state.term = new;
self.finish_change(&state).await?;
}
let after = self.acceptor_state.term;
Ok(TimelineTermBumpResponse {
previous_term: before,
current_term: after,
})
}
}
impl<CTRL> Deref for TimelineState<CTRL>

View File

@@ -4,6 +4,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
@@ -169,6 +170,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> {
}
/// This structure is stored in shared state and represents the state of the timeline.
///
/// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
/// case, SafeKeeper is not available (because WAL is not present on disk) and all
/// operations can be done only with control file.
@@ -214,6 +216,10 @@ impl StateSK {
.get_last_log_term(self.flush_lsn())
}
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
self.state_mut().term_bump(to).await
}
/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
@@ -853,6 +859,11 @@ impl Timeline {
Ok(res)
}
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let mut state = self.write_shared_state().await;
state.sk.term_bump(to).await
}
/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is

View File

@@ -1,6 +1,8 @@
//! Code related to evicting WAL files to remote storage. The actual upload is done by the
//! partial WAL backup code. This file has code to delete and re-download WAL files,
//! cross-validate with partial WAL backup if local file is still present.
//! Code related to evicting WAL files to remote storage.
//!
//! The actual upload is done by the partial WAL backup code. This file has
//! code to delete and re-download WAL files, cross-validate with partial WAL
//! backup if local file is still present.
use anyhow::Context;
use camino::Utf8PathBuf;

View File

@@ -1,4 +1,6 @@
//! Timeline residence guard is needed to ensure that WAL segments are present on disk,
//! Timeline residence guard
//!
//! It is needed to ensure that WAL segments are present on disk,
//! as long as the code is holding the guard. This file implements guard logic, to issue
//! and drop guards, and to notify the manager when the guard is dropped.

View File

@@ -1,4 +1,5 @@
//! The timeline manager task is responsible for managing the timeline's background tasks.
//!
//! It is spawned alongside each timeline and exits when the timeline is deleted.
//! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
//! It also can manage some reactive state, like should the timeline be active for broker pushes or not.

View File

@@ -60,7 +60,8 @@ impl TimelinesSet {
}
}
/// Guard is used to add or remove timeline from the set.
/// Guard is used to add or remove timelines from the set.
///
/// If the timeline present in set, it will be removed from it on drop.
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
/// It is designed to be used in the manager task only.

View File

@@ -1,6 +1,8 @@
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
//! and `flush_lsn` updates.
//!
//! After the partial segment was updated (`flush_lsn` was changed), the segment
//! will be uploaded to S3 within the configured `partial_backup_timeout`.
//!
//! The filename format for partial segments is
//! `Segment_Term_Flush_Commit_skNN.partial`, where:

View File

@@ -17,6 +17,7 @@ use crate::SafeKeeperConf;
use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
///
/// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
/// tenant are allowed). Doesn't matter if auth is disabled in conf.

View File

@@ -98,7 +98,19 @@ pub struct PhysicalStorage {
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
/// The LSN of the last WAL record written to disk. Still can be not fully
/// flushed.
///
/// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
/// switch ingest the reverse is true because we don't bump write_lsn up to
/// the next segment: WAL stream from the compute doesn't have the gap and
/// for simplicity / as a sanity check we disallow any non-sequential
/// writes, so write zeros as is.
///
/// Similar effect is in theory possible due to LSN alignment: if record
/// ends at *2, decoder will report end lsn as *8 even though we haven't
/// written these zeros yet. In practice compute likely never sends
/// non-aligned chunks of data.
write_record_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
@@ -167,8 +179,7 @@ impl PhysicalStorage {
)
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
// note: this assumes we fsync'ed whole datadir on start.
let flush_lsn = write_lsn;
debug!(
@@ -440,11 +451,12 @@ impl Storage for PhysicalStorage {
.with_label_values(&["truncate_wal"])
.start_timer();
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
// Streaming must not create a hole, so truncate cannot be called on
// non-written lsn.
if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
"truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
self.write_record_lsn,
end_pos
);
}

View File

@@ -134,7 +134,7 @@ class LLVM:
# Show a user-friendly warning
raise Exception(' '.join([
f"It appears that you don't have `{name}` installed.",
"Please execute `rustup component add llvm-tools-preview`,",
"Please execute `rustup component add llvm-tools`,",
"or install it via your package manager of choice.",
"LLVM tools should be the same version as LLVM in `rustc --version --verbose`.",
]))
@@ -518,7 +518,7 @@ def main() -> None:
example = f"""
prerequisites:
# alternatively, install a system package for `llvm-tools`
rustup component add llvm-tools-preview
rustup component add llvm-tools
self-contained example:
{app} run make

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ALTER availability_zone_id DROP NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ALTER availability_zone_id SET NOT NULL;

Some files were not shown because too many files have changed in this diff Show More