Compare commits

...

26 Commits

Author SHA1 Message Date
Arseny Sher
f6e0a1a5f5 v16-lr-worker-reply-after-processing 2024-09-10 17:52:40 +03:00
Matthias van de Meent
842be0ba74 Specialize WalIngest on PostgreSQL version (#8904)
The current code assumes that most of this functionality is
version-independent, which is only true up to v16 - PostgreSQL 17 has a
new field in CheckPoint that we need to keep track of.

This basically removes the file-level dependency on v14, and replaces it
with switches that load the correct version dependencies where required.
2024-09-09 23:01:52 +01:00
Heikki Linnakangas
982b376ea2 Update parquet crate to a released version (#8961)
PR #7782 set the dependency in Cargo.toml to 'master', and locked the
version to commit that contained a specific fix, because we needed the
fix before it was included in a versioned release. The fix was later
included in parquet crate version 52.0.0, so we can now switch back to
using a released version. The latest release is 53.0.0, switch straight
to that.

---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
2024-09-10 00:04:00 +03:00
Alex Chi Z.
e158df4e86 feat(pageserver): split delta writer automatically determines key range (#8850)
close https://github.com/neondatabase/neon/issues/8838

## Summary of changes

This patch modifies the split delta layer writer to avoid taking
start_key and end_key when creating/finishing the layer writer. The
start_key for the delta layers will be the first key provided to the
layer writer, and the end_key would be the `last_key.next()`. This
simplifies the delta layer writer API.

On that, the layer key hack is removed. Image layers now use the full
key range, and delta layers use the first/last key provided by the user.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-09 22:03:27 +01:00
Heikki Linnakangas
723c0971e8 Don't create 'empty' branch in neon_simple_env (#8965)
Now that we've given up hope on sharing the neon_simple_env between
tests, there's no reason to not use the 'main' branch directly.
2024-09-09 12:38:34 +03:00
Heikki Linnakangas
c8f67eed8f Remove TEST_SHARED_FIXTURES (#8965)
I wish it worked, but it's been broken for a long time, so let's admit
defeat and remove it.

The idea of sharing the same pageserver and safekeeper environment
between tests is still sound, and it could save a lot of time in our
CI. We should perhaps put some time into doing that, but we're better
off starting from scratch than trying to make TEST_SHARED_FIXTURES
work in its current form.
2024-09-09 12:38:34 +03:00
Heikki Linnakangas
2d885ac07a Update strum (#8962)
I wanted to use some features from the newer version. The PR that needed
the new version is not ready yet (and might never be), but seems nice to
stay up in any case.
2024-09-08 21:47:57 +03:00
Heikki Linnakangas
89c5e80b3f Update toml and toml_edit crates (#8963)
Eliminates a few duplicate versions from the dependency tree.
2024-09-08 21:47:23 +03:00
Heikki Linnakangas
93ec7503e0 Lock the correct revision of rust-postgres crates (#8960)
We modified the crate in an incompatible way and upgraded to the new
version in PR #8076. However, it was reverted in #8654. The revert
reverted the Cargo.lock reference to it, but since Cargo.toml still
points to the (tip of the) 'neon' branch, every time you make any other
unrelated changes to Cargo.toml, it also tries to update the
rust-postgres crates to the tip of the 'neon' branch again, which
doesn't work.

To fix, lock the crates to the exact commit SHA that works.
2024-09-07 14:11:36 +01:00
Alexander Bayandin
7d7d1f354b Fix rust warnings on macOS (#8955)
## Problem
```
error: unused import: `anyhow::Context`
 --> libs/utils/src/crashsafe.rs:8:5
  |
8 | use anyhow::Context;
  |     ^^^^^^^^^^^^^^^
  |
  = note: `-D unused-imports` implied by `-D warnings`
  = help: to override `-D warnings` add `#[allow(unused_imports)]`

error: unused variable: `fd`
   --> libs/utils/src/crashsafe.rs:209:15
    |
209 | pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
    |               ^^ help: if this is intentional, prefix it with an underscore: `_fd`
    |
    = note: `-D unused-variables` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(unused_variables)]`
```

## Summary of changes
- Fix rust warnings on macOS
2024-09-07 08:17:25 +01:00
Cihan Demirci
16c200d6d9 push images to prod ACR (#8940)
Used `vars` for new storing non-sensitive information, changed dev
secrets to vars as well but
didn't cleanup any secrets.

https://github.com/neondatabase/cloud/issues/16925

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-09-07 00:20:36 +01:00
Joonas Koivunen
3dbd34aa78 feat(storcon): forward gc blocking and unblocking (#8956)
Currently using gc blocking and unblocking with storage controller
managed pageservers is painful. Implement the API on storage controller.

Fixes: #8893
2024-09-06 22:42:55 +01:00
Arpad Müller
fa3fc73c1b Address 1.82 clippy lints (#8944)
Addresses the clippy lints of the beta 1.82 toolchain.

The `too_long_first_doc_paragraph` lint complained a lot and was
addressed separately: #8941
2024-09-06 21:05:18 +02:00
Alex Chi Z.
ac5815b594 feat(storage-controller): add node shards api (#8896)
For control-plane managed tenants, we have the page in the admin console
that lists all tenants on a specific pageserver. But for
storage-controller managed ones, we don't have that functionality for
now.

## Summary of changes

Adds an API that lists all shards on a given node (intention + observed)

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-06 14:14:21 -04:00
Alexander Bayandin
30583cb626 CI(label-for-external-users): add retry logic for unexpected errors (#8938)
## Problem

One of the PRs opened by a `neondatabase` org member got labelled as
`external` because the `gh api` call failed in the wrong way:
```
Get "https://api.github.com/orgs/neondatabase/members/<username>": dial tcp 140.82.114.5:443: i/o timeout
is-member=false
```

## Summary of changes
- Check that the error message is expected before labelling PRs
- Retry `gh api` call for 10 times in case of unexpected error messages
- Add `workflow_dispatch` trigger
2024-09-06 17:42:35 +01:00
Arseny Sher
c1a51416db safekeeper: fsync filesystem on start.
We can't really rely on files contents after boot without fsync'ing
them.
2024-09-06 19:14:25 +03:00
Arseny Sher
8eab7009c1 safekeeper: do pid file lock before id init 2024-09-06 19:14:25 +03:00
Arseny Sher
11cf16e3f3 safekeeper: add term_bump endpoint.
When walproposer observes now higher term it restarts instead of
crashing whole compute with PANIC; this avoids compute crash after
term_bump call. After successfull election we're still checking
last_log_term of the highest given vote to ensure basebackup is good,
and PANIC otherwise.

It will be used for migration per
035-safekeeper-dynamic-membership-change.md
and
https://github.com/neondatabase/docs/pull/21

ref https://github.com/neondatabase/neon/issues/8700
2024-09-06 19:13:50 +03:00
Folke Behrens
af6f63617e proxy: clean up code and lints for 1.81 and 1.82 (#8945) 2024-09-06 17:13:30 +02:00
Arseny Sher
e287f36a05 safekeeper: fix endpoint restart immediately after xlog switch.
Check that truncation point is not from the future by comparing it with
write_record_lsn, not write_lsn, and explain that xlog switch changes
their normal order.

ref https://github.com/neondatabase/neon/issues/8911
2024-09-06 18:09:21 +03:00
Arpad Müller
cbcd4058ed Fix 1.82 clippy lint too_long_first_doc_paragraph (#8941)
Addresses the 1.82 beta clippy lint `too_long_first_doc_paragraph` by
adding newlines to the first sentence if it is short enough, and making
a short first sentence if there is the need.
2024-09-06 14:33:52 +02:00
Vlad Lazar
e86fef05dd storcon: track preferred AZ for each tenant shard (#8937)
## Problem
We want to do AZ aware scheduling, but don't have enough metadata.

## Summary of changes
Introduce a `preferred_az_id` concept for each managed tenant shard.

In a future PR, the scheduler will use this as a soft preference. 
The idea is to try and keep the shard attachments within the same AZ.
Under the assumption that the compute was placed in the correct AZ,
this reduces the chances of cross AZ trafic from between compute and PS.

In terms of code changes we:
1. Add a new nullable `preferred_az_id` column to the `tenant_shards`
table. Also include an in-memory counterpart.
2. Populate the preferred az on tenant creation and shard splits.
3. Add an endpoint which allows to bulk-set preferred AZs.

(3) gives us the migration path. I'll write a script which queries the
cplane db in the region and sets the preferred az of all shards with an 
active compute to the AZ of said compute. For shards without an active compute, 
I'll use the AZ of the currently attached pageserver
since this is what cplane uses now to schedule computes.
2024-09-06 13:11:17 +01:00
Arpad Müller
a1323231bc Update Rust to 1.81.0 (#8939)
We keep the practice of keeping the compiler up to date, pointing to the
latest release. This is done by many other projects in the Rust
ecosystem as well.

[Release notes](https://github.com/rust-lang/rust/blob/master/RELEASES.md#version-1810-2024-09-05).

Prior update was in #8667 and #8518
2024-09-06 12:40:19 +02:00
Christian Schwarz
06e840b884 compact_level0_phase1: ignore access mode config, always do streaming-kmerge without validation (#8934)
refs https://github.com/neondatabase/neon/issues/8184

PR https://github.com/neondatabase/infra/pull/1905 enabled
streaming-kmerge without validation everywhere.

It rolls into prod sooner or in the same release as this PR.
2024-09-06 10:58:48 +02:00
Christian Schwarz
cf11c8ab6a update svg_fmt to 0.4.3 (#8930)
Audited

```
diff -r -u ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/svg_fmt-0.4.{2,3}
```

fixes https://github.com/neondatabase/neon/issues/7763
2024-09-06 10:52:29 +02:00
Vlad Lazar
04f99a87bf storcon: make pageserver AZ id mandatory (#8856)
## Problem
https://github.com/neondatabase/neon/pull/8852 introduced a new nullable
column for the `nodes` table: `availability_zone_id`

## Summary of changes
* Make neon local and the test suite always provide an az id
* Make the az id field in the ps registration request mandatory
* Migrate the column to non-nullable and adjust in memory state
accordingly
* Remove the code that was used to populate the az id for pre-existing
nodes
2024-09-05 19:14:21 +01:00
145 changed files with 1966 additions and 1135 deletions

View File

@@ -7,6 +7,13 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
- AZURE_PROD_CLIENT_ID
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER

56
.github/workflows/_push-to-acr.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: Push images to ACR
on:
workflow_call:
inputs:
client_id:
description: Client ID of Azure managed identity or Entra app
required: true
type: string
image_tag:
description: Tag for the container image
required: true
type: string
images:
description: Images to push
required: true
type: string
registry_name:
description: Name of the container registry
required: true
type: string
subscription_id:
description: Azure subscription ID
required: true
type: string
tenant_id:
description: Azure tenant ID
required: true
type: string
jobs:
push-to-acr:
runs-on: ubuntu-22.04
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
steps:
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ inputs.client_id }}
subscription-id: ${{ inputs.subscription_id }}
tenant-id: ${{ inputs.tenant_id }}
- name: Login to ACR
run: |
az acr login --name=${{ inputs.registry_name }}
- name: Copy docker images to ACR ${{ inputs.registry_name }}
run: |
images='${{ inputs.images }}'
for image in ${images}; do
docker buildx imagetools create \
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
neondatabase/${image}:${{ inputs.image_tag }}
done

View File

@@ -794,9 +794,6 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml down
promote-images:
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
@@ -823,28 +820,6 @@ jobs:
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
- name: Azure login
if: github.ref_name == 'main'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'
run: |
az acr login --name=neoneastus2
- name: Copy docker images to ACR-dev
if: github.ref_name == 'main'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create \
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
done
- name: Add latest tag to images
if: github.ref_name == 'main'
run: |
@@ -882,6 +857,30 @@ jobs:
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
push-to-acr-dev:
if: github.ref_name == 'main'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
push-to-acr-prod:
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, tag ]
runs-on: ubuntu-22.04
@@ -957,8 +956,8 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -7,6 +7,11 @@ on:
pull_request_target:
types:
- opened
workflow_dispatch:
inputs:
github-actor:
description: 'GitHub username. If empty, the username of the current user will be used'
required: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -26,12 +31,31 @@ jobs:
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
ACTOR: ${{ inputs.github-actor || github.actor }}
run: |
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
else
is_member=false
fi
expected_error="User does not exist or is not a member of the organization"
output_file=output.txt
for i in $(seq 1 10); do
if gh api "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${ACTOR}" \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" > ${output_file}; then
is_member=true
break
elif grep -q "${expected_error}" ${output_file}; then
is_member=false
break
elif [ $i -eq 10 ]; then
title="Failed to get memmbership status for ${ACTOR}"
message="The latest GitHub API error message: '$(cat ${output_file})'"
echo "::error file=.github/workflows/label-for-external-users.yml,title=${title}::${message}"
exit 1
fi
sleep 1
done
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}

150
Cargo.lock generated
View File

@@ -915,25 +915,22 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.65.1"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.1",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"itertools 0.12.1",
"log",
"peeking_take_while",
"prettyplease 0.2.6",
"prettyplease 0.2.17",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.52",
"which",
]
[[package]]
@@ -1192,9 +1189,9 @@ dependencies = [
[[package]]
name = "comfy-table"
version = "6.1.4"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d"
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
dependencies = [
"crossterm",
"strum",
@@ -1249,7 +1246,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -1363,8 +1360,8 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util",
"toml 0.7.4",
"toml_edit 0.19.10",
"toml",
"toml_edit",
"tracing",
"url",
"utils",
@@ -1488,25 +1485,22 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crossterm"
version = "0.25.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.1",
"crossterm_winapi",
"libc",
"mio",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
]
[[package]]
name = "crossterm_winapi"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
dependencies = [
"winapi",
]
@@ -2949,12 +2943,6 @@ dependencies = [
"spin 0.5.2",
]
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.150"
@@ -3153,7 +3141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff"
dependencies = [
"serde",
"toml 0.8.14",
"toml",
]
[[package]]
@@ -3669,7 +3657,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"utils",
"workspace_hack",
]
@@ -3756,7 +3744,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"twox-hash",
"url",
@@ -3919,8 +3907,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8"
dependencies = [
"ahash",
"bytes",
@@ -3939,8 +3928,9 @@ dependencies = [
[[package]]
name = "parquet_derive"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86e9fcfae007533a06b580429a3f7e07cb833ec8aa37c041c16563e7918f057e"
dependencies = [
"parquet",
"proc-macro2",
@@ -3977,12 +3967,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "3.0.3"
@@ -4136,7 +4120,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4149,7 +4133,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4168,7 +4152,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4280,9 +4264,9 @@ dependencies = [
[[package]]
name = "prettyplease"
version = "0.2.6"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
dependencies = [
"proc-macro2",
"syn 2.0.52",
@@ -4827,7 +4811,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"utils",
]
@@ -5337,7 +5321,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-subscriber",
"url",
@@ -5746,17 +5730,6 @@ dependencies = [
"signal-hook-registry",
]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@@ -6069,21 +6042,21 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.24.1"
version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
[[package]]
name = "strum_macros"
version = "0.24.3"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck 0.4.1",
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
"syn 1.0.109",
"syn 2.0.52",
]
[[package]]
@@ -6094,8 +6067,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.2"
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
[[package]]
name = "syn"
@@ -6423,7 +6397,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"async-trait",
"byteorder",
@@ -6534,18 +6508,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "toml"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.19.10",
]
[[package]]
name = "toml"
version = "0.8.14"
@@ -6555,7 +6517,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.22.14",
"toml_edit",
]
[[package]]
@@ -6567,19 +6529,6 @@ dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
dependencies = [
"indexmap 1.9.3",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.4.6",
]
[[package]]
name = "toml_edit"
version = "0.22.14"
@@ -6590,7 +6539,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.13",
"winnow",
]
[[package]]
@@ -7003,7 +6952,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"toml_edit",
"tracing",
"tracing-error",
"tracing-subscriber",
@@ -7549,15 +7498,6 @@ version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
[[package]]
name = "winnow"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.13"
@@ -7627,6 +7567,7 @@ dependencies = [
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"libc",
"log",
@@ -7664,6 +7605,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.24.0",
"tokio-util",
"toml_edit",
"tonic",
"tower",
"tracing",

View File

@@ -64,7 +64,7 @@ aws-types = "1.2.0"
axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bindgen = "0.70"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
@@ -73,7 +73,7 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
comfy-table = "6.1"
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
@@ -123,8 +123,8 @@ opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "51.0.0"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.16"
@@ -158,11 +158,10 @@ signal-hook = "0.3"
smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
strum = "0.26"
strum_macros = "0.26"
"subtle" = "2.5.0"
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
svg_fmt = "0.4.3"
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"
@@ -178,8 +177,8 @@ tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.7"
toml_edit = "0.19"
toml = "0.8"
toml_edit = "0.22"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
@@ -202,10 +201,21 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# We want to use the 'neon' branch for these, but there's currently one
# incompatible change on the branch. See:
#
# - PR #8076 which contained changes that depended on the new changes in
# the rust-postgres crate, and
# - PR #8654 which reverted those changes and made the code in proxy incompatible
# with the tip of the 'neon' branch again.
#
# When those proxy changes are re-applied (see PR #8747), we can switch using
# the tip of the 'neon' branch again.
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -242,11 +252,7 @@ tonic-build = "0.9"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# bug fixes for UUID
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
################# Binary contents sections

View File

@@ -87,6 +87,7 @@ RUN mkdir -p /data/.neon/ && \
"pg_distrib_dir='/usr/local/'\n" \
"listen_pg_addr='0.0.0.0:6400'\n" \
"listen_http_addr='0.0.0.0:9898'\n" \
"availability_zone='local'\n" \
> /data/.neon/pageserver.toml && \
chown -R neon:neon /data/.neon

View File

@@ -192,7 +192,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.80.1
ENV RUSTC_VERSION=1.81.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
@@ -207,7 +207,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
export PATH="$HOME/.cargo/bin:$PATH" && \
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools-preview rustfmt clippy && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \

View File

@@ -22,9 +22,10 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal. Wrapping the result
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// Escape a string for including it in a SQL literal.
///
/// Wrapping the result with `E'{}'` or `'{}'` is not required,
/// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
/// for the original implementation.
pub fn escape_literal(s: &str) -> String {

View File

@@ -75,14 +75,14 @@ impl PageServerNode {
}
}
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::Document {
toml_edit::Document::from_str(&format!("id={node_id}")).unwrap()
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::DocumentMut {
toml_edit::DocumentMut::from_str(&format!("id={node_id}")).unwrap()
}
fn pageserver_init_make_toml(
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::Document> {
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)
@@ -137,9 +137,9 @@ impl PageServerNode {
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
let mut config_toml = toml_edit::Document::new();
let mut config_toml = toml_edit::DocumentMut::new();
for fragment_str in overrides {
let fragment = toml_edit::Document::from_str(&fragment_str)
let fragment = toml_edit::DocumentMut::from_str(&fragment_str)
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
for (key, item) in fragment.iter() {
config_toml.insert(key, item.clone());

View File

@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -80,7 +80,10 @@ enum Command {
/// List nodes known to the storage controller
Nodes {},
/// List tenants known to the storage controller
Tenants {},
Tenants {
/// If this field is set, it will list the tenants on a specific node
node_id: Option<NodeId>,
},
/// Create a new tenant in the storage controller, and by extension on pageservers.
TenantCreate {
#[arg(long)]
@@ -336,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id: Some(availability_zone_id),
availability_zone_id,
}),
)
.await?;
@@ -403,7 +406,41 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::Tenants {} => {
Command::Tenants {
node_id: Some(node_id),
} => {
let describe_response = storcon_client
.dispatch::<(), NodeShardResponse>(
Method::GET,
format!("control/v1/node/{node_id}/shards"),
None,
)
.await?;
let shards = describe_response.shards;
let mut table = comfy_table::Table::new();
table.set_header([
"Shard",
"Intended Primary/Secondary",
"Observed Primary/Secondary",
]);
for shard in shards {
table.add_row([
format!("{}", shard.tenant_shard_id),
match shard.is_intended_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
match shard.is_observed_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
]);
}
println!("{table}");
}
Command::Tenants { node_id: None } => {
let mut resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,

View File

@@ -68,6 +68,7 @@ macro_rules! register_uint_gauge {
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
///
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> prometheus::Result<()> {

View File

@@ -104,7 +104,9 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
#[serde(skip_serializing)]
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -209,43 +211,6 @@ pub enum GetImpl {
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
@@ -452,7 +417,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
compact_level0_phase1_value_access: Default::default(),
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -57,7 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub availability_zone_id: Option<String>,
pub availability_zone_id: String,
}
#[derive(Serialize, Deserialize)]
@@ -74,6 +74,17 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsResponse {
pub updated: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -101,6 +112,21 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,
pub shards: Vec<NodeShard>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShard {
pub tenant_shard_id: TenantShardId,
/// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_observed_secondary: Option<bool>,
/// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_intended_secondary: Option<bool>,
}
#[derive(Serialize, Deserialize)]
pub struct NodeDescribeResponse {
pub id: NodeId,
@@ -132,8 +158,12 @@ pub struct TenantDescribeResponseShard {
pub is_splitting: bool,
pub scheduling_policy: ShardSchedulingPolicy,
pub preferred_az_id: Option<String>,
}
/// Migration request for a given tenant shard to a given node.
///
/// Explicitly migrating a particular shard is a low level operation
/// TODO: higher level "Reschedule tenant" operation where the request
/// specifies some constraints, e.g. asking it to get off particular node(s)

View File

@@ -263,15 +263,6 @@ impl Key {
field5: u8::MAX,
field6: u32::MAX,
};
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
pub const NON_L0_MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX - 1,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {

View File

@@ -62,7 +62,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumVariantNames,
strum_macros::VariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
@@ -305,8 +305,10 @@ pub struct TenantConfig {
pub lsn_lease_length_for_ts: Option<String>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// The policy for the aux file storage.
///
/// It can be switched through `switch_aux_file_policy` tenant config.
/// When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
@@ -896,7 +898,9 @@ pub struct WalRedoManagerStatus {
pub process: Option<WalRedoManagerProcessStatus>,
}
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
/// The progress of a secondary tenant.
///
/// It is mostly useful when doing a long running download: e.g. initiating
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
/// what's happening.
#[derive(Default, Debug, Serialize, Deserialize, Clone)]

View File

@@ -69,8 +69,10 @@ impl QueryError {
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
/// or the client closing the connection.
///
/// These errors can happen during normal operations,
/// and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(

View File

@@ -7,6 +7,7 @@ use std::fmt;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
///
/// The `host` part should be a correct `url::Host`, while `port` (if present) should be
/// a valid decimal u16 of digits only.
pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>), anyhow::Error> {

View File

@@ -14,7 +14,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
fn include_file(&self, filename: &str) {
// This does the equivalent of passing bindgen::CargoCallbacks
// to the builder .parse_callbacks() method.
let cargo_callbacks = bindgen::CargoCallbacks;
let cargo_callbacks = bindgen::CargoCallbacks::new();
cargo_callbacks.include_file(filename)
}
@@ -121,6 +121,7 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogPageHeaderData")
.allowlist_type("XLogLongPageHeaderData")
.allowlist_var("XLOG_PAGE_MAGIC")
.allowlist_var("PG_MAJORVERSION_NUM")
.allowlist_var("PG_CONTROL_FILE_SIZE")
.allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.allowlist_type("PageHeaderData")

View File

@@ -44,6 +44,9 @@ macro_rules! postgres_ffi {
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
}
};
}
@@ -106,6 +109,107 @@ macro_rules! dispatch_pgversion {
};
}
#[macro_export]
macro_rules! enum_pgversion_dispatch {
($name:expr, $typ:ident, $bind:ident, $code:block) => {
enum_pgversion_dispatch!(
name = $name,
bind = $bind,
typ = $typ,
code = $code,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
)
};
(name = $name:expr,
bind = $bind:ident,
typ = $typ:ident,
code = $code:block,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
match $name {
$(
self::$typ::$variant($bind) => {
use $crate::$md as pgv;
$code
}
),+,
}
};
}
#[macro_export]
macro_rules! enum_pgversion {
{$name:ident, pgv :: $t:ident} => {
enum_pgversion!{
name = $name,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{$name:ident, pgv :: $p:ident :: $t:ident} => {
enum_pgversion!{
name = $name,
path = $p,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{name = $name:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ( $crate::$md::$t )),+
}
impl self::$name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<self::$name> for $crate::$md::$t {
fn into(self) -> self::$name {
self::$name::$variant (self)
}
}
)+
};
{name = $name:ident,
path = $p:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ($crate::$md::$p::$t)),+
}
impl $name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<$name> for $crate::$md::$p::$t {
fn into(self) -> $name {
$name::$variant (self)
}
}
)+
};
}
pub mod pg_constants;
pub mod relfile_utils;

View File

@@ -185,7 +185,7 @@ mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::Document>().unwrap();
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}

View File

@@ -45,6 +45,8 @@ pub use azure_core::Etag;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Default concurrency limit for S3 operations
///
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -300,7 +302,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<(), TimeTravelError>;
}
/// DownloadStream is sensitive to the timeout and cancellation used with the original
/// Data part of an ongoing [`Download`].
///
/// `DownloadStream` is sensitive to the timeout and cancellation used with the original
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
/// with `tokio::io::copy_buf`.
// This has 'static because safekeepers do not use cancellation tokens (yet)

View File

@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}

View File

@@ -5,9 +5,10 @@
mod calculation;
pub mod svg;
/// StorageModel is the input to the synthetic size calculation. It represents
/// a tree of timelines, with just the information that's needed for the
/// calculation. This doesn't track timeline names or where each timeline
/// StorageModel is the input to the synthetic size calculation.
///
/// It represents a tree of timelines, with just the information that's needed
/// for the calculation. This doesn't track timeline names or where each timeline
/// begins and ends, for example. Instead, it consists of "points of interest"
/// on the timelines. A point of interest could be the timeline start or end point,
/// the oldest point on a timeline that needs to be retained because of PITR

View File

@@ -5,8 +5,10 @@ use std::{
use metrics::IntCounter;
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// Circuit breakers are for operations that are expensive and fallible.
///
/// If a circuit breaker fails repeatedly, we will stop attempting it for some
/// period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// An identifier that enables us to log useful errors when a circuit is broken

View File

@@ -1,3 +1,4 @@
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
@@ -203,6 +204,27 @@ pub fn overwrite(
Ok(())
}
/// Syncs the filesystem for the given file descriptor.
#[cfg_attr(target_os = "macos", allow(unused_variables))]
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use anyhow::Context;
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -249,8 +249,10 @@ macro_rules! id_newtype {
};
}
/// Neon timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// Neon timeline ID.
///
/// They are different from PostgreSQL timeline
/// IDs, but serve a similar purpose: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given

View File

@@ -100,7 +100,9 @@ pub enum LockFileRead {
}
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
/// inspect its content.
///
/// It is not an `Err(...)` if the file does not exist or is already locked.
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);

View File

@@ -3,11 +3,9 @@ use std::str::FromStr;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
use strum_macros::{EnumString, VariantNames};
#[derive(
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
)]
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
@@ -190,7 +188,7 @@ impl Drop for TracingPanicHookGuard {
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();

View File

@@ -8,6 +8,7 @@ use tracing::{trace, warn};
use crate::lsn::Lsn;
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.

View File

@@ -65,6 +65,8 @@ impl<T> Poison<T> {
}
}
/// Armed pointer to a [`Poison`].
///
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned

View File

@@ -13,10 +13,11 @@ pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
/// Combination of ShardNumber and ShardCount.
///
/// For use within the context of a particular tenant, when we need to know which shard we're
/// dealing with, but do not need to know the full ShardIdentity (because we won't be doing
/// any page->shard mapping), and do not need to know the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,

View File

@@ -49,12 +49,11 @@ use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
///
/// Rcu allows multiple readers to read and hold onto a value without blocking
/// (for very long). Storing to the Rcu updates the value, making new readers
/// immediately see the new value, but it also waits for all current readers to
/// finish.
/// (for very long).
///
/// Storing to the Rcu updates the value, making new readers immediately see
/// the new value, but it also waits for all current readers to finish.
pub struct Rcu<V> {
inner: RwLock<RcuInner<V>>,
}

View File

@@ -5,7 +5,9 @@ use std::sync::{
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// `SemaphorePermit`.
///
/// Allows use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].

View File

@@ -10,7 +10,7 @@ pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::Document = match item {
let document: toml_edit::DocumentMut = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()

View File

@@ -7,6 +7,7 @@ pub enum VecMapOrdering {
}
/// Ordered map datastructure implemented in a Vec.
///
/// Append only - can only add keys that are larger than the
/// current max key.
/// Ordering can be adjusted using [`VecMapOrdering`]

View File

@@ -6,9 +6,10 @@ pub enum YieldingLoopError {
Cancelled,
}
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
/// yields to avoid blocking the executor, and after resuming checks the provided
/// cancellation token to drop out promptly on shutdown.
/// Helper for long synchronous loops, e.g. over all tenants in the system.
///
/// Periodically yields to avoid blocking the executor, and after resuming
/// checks the provided cancellation token to drop out promptly on shutdown.
#[inline(always)]
pub async fn yielding_loop<I, T, F>(
interval: usize,

View File

@@ -4,7 +4,6 @@
use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};
use bindgen::CargoCallbacks;
fn main() -> anyhow::Result<()> {
// Tell cargo to invalidate the built crate whenever the wrapper changes
@@ -64,16 +63,25 @@ fn main() -> anyhow::Result<()> {
.map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
};
let unwind_abi_functions = [
"log_internal",
"recovery_download",
"start_streaming",
"finish_sync_safekeepers",
"wait_event_set",
"WalProposerStart",
];
// The bindgen::Builder is the main entry point
// to bindgen, and lets you build up options for
// the resulting bindings.
let bindings = bindgen::Builder::default()
let mut builder = bindgen::Builder::default()
// The input header we would like to generate
// bindings for.
.header("bindgen_deps.h")
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.
.parse_callbacks(Box::new(CargoCallbacks))
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.allowlist_type("WalProposer")
.allowlist_type("WalProposerConfig")
.allowlist_type("walproposer_api")
@@ -105,7 +113,12 @@ fn main() -> anyhow::Result<()> {
.allowlist_var("WL_SOCKET_MASK")
.clang_arg("-DWALPROPOSER_LIB")
.clang_arg(format!("-I{pgxn_neon}"))
.clang_arg(format!("-I{inc_server_path}"))
.clang_arg(format!("-I{inc_server_path}"));
for name in unwind_abi_functions {
builder = builder.override_abi(bindgen::Abi::CUnwind, name);
}
let bindings = builder
// Finish the builder and generate the bindings.
.generate()
// Unwrap the Result and panic on failure.

View File

@@ -33,7 +33,7 @@ extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemStat
}
}
extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
extern "C-unwind" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -187,7 +187,7 @@ extern "C" fn conn_blocking_write(
}
}
extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
extern "C-unwind" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -272,7 +272,7 @@ extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
}
}
extern "C" fn wait_event_set(
extern "C-unwind" fn wait_event_set(
wp: *mut WalProposer,
timeout: ::std::os::raw::c_long,
event_sk: *mut *mut Safekeeper,
@@ -324,7 +324,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
}
}
extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
@@ -340,7 +340,7 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, sk: *mut Safekee
}
}
extern "C" fn log_internal(
extern "C-unwind" fn log_internal(
wp: *mut WalProposer,
level: ::std::os::raw::c_int,
line: *const ::std::os::raw::c_char,

View File

@@ -1,2 +1,20 @@
pub mod mgmt_api;
pub mod page_service;
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
// If file structure is per-kind not per-feature then where to put this?
#[derive(Clone, Copy)]
pub enum BlockUnblock {
Block,
Unblock,
}
impl std::fmt::Display for BlockUnblock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
BlockUnblock::Block => "block",
BlockUnblock::Unblock => "unblock",
};
f.write_str(s)
}
}

View File

@@ -12,6 +12,8 @@ use utils::{
pub use reqwest::Body as ReqwestBody;
use crate::BlockUnblock;
pub mod util;
#[derive(Debug, Clone)]
@@ -454,6 +456,20 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
self.mgmt_api_endpoint,
);
self.request(Method::POST, &uri, ()).await.map(|_| ())
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> {
println!("specified prefix '{}' failed validation", cmd.prefix);
return Ok(());
};
let toml_document = toml_edit::Document::from_str(&cmd.config_toml_str)?;
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");

View File

@@ -37,6 +37,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::crashsafe::syncfs;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::{
@@ -125,7 +126,6 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -156,23 +156,7 @@ fn main() -> anyhow::Result<()> {
};
let started = Instant::now();
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use std::os::fd::AsRawFd;
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
drop(dirfd);
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
syncfs(dirfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),

View File

@@ -174,16 +174,14 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: pageserver_api::config::CompactL0Phase1ValueAccess,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
/// Token for authentication to safekeepers
///
/// We do not want to store this in a PageServerConf because the latter may be logged
/// and/or serialized at a whim, while the token is secret. Currently this token is the
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
@@ -338,7 +336,7 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access,
compact_level0_phase1_value_access: _,
l0_flush,
virtual_file_direct_io,
concurrent_tenant_warmup,
@@ -383,7 +381,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access,
virtual_file_direct_io,
io_buffer_alignment,
@@ -561,6 +558,16 @@ mod tests {
.expect("parse_and_validate");
}
#[test]
fn test_compactl0_phase1_access_mode_is_ignored_silently() {
let input = indoc::indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
validate = "key-lsn-value"
"#};
toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input).unwrap();
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.
@@ -637,14 +644,5 @@ mod tests {
// some_invalid_field = 23
// "#}
// );
test!(
compact_level0_phase1_value_access,
indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
some_invalid_field = 23
"#}
);
}
}

View File

@@ -1,7 +1,9 @@
//! This module defines `RequestContext`, a structure that we use throughout
//! the pageserver to propagate high-level context from places
//! that _originate_ activity down to the shared code paths at the
//! heart of the pageserver. It's inspired by Golang's `context.Context`.
//! Defines [`RequestContext`].
//!
//! It is a structure that we use throughout the pageserver to propagate
//! high-level context from places that _originate_ activity down to the
//! shared code paths at the heart of the pageserver. It's inspired by
//! Golang's `context.Context`.
//!
//! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
//! 1. What high-level activity ([`TaskKind`]) needs this page?

View File

@@ -141,10 +141,24 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
m.other
);
let az_id = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
let az_id = {
let az_id_from_metadata = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
match az_id_from_metadata {
Some(az_id) => Some(az_id),
None => {
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
conf.availability_zone.clone()
}
}
};
if az_id.is_none() {
panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
}
Some(NodeRegisterRequest {
node_id: conf.id,
@@ -152,7 +166,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
availability_zone_id: az_id,
availability_zone_id: az_id.expect("Checked above"),
})
}
Err(e) => {

View File

@@ -9,7 +9,7 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use strum_macros::{IntoStaticStr, VariantNames};
use tracing::warn;
use utils::id::TimelineId;
@@ -27,7 +27,7 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
];
// Metrics collected on operations on the storage repository.
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
#[derive(Debug, VariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum StorageTimeOperation {
#[strum(serialize = "layer flush")]

View File

@@ -1021,9 +1021,10 @@ impl Timeline {
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
/// updates to the repository.
///
/// It is created by the 'begin_record' function. It is called for each WAL
/// record, so that all the modifications by a one WAL record appear atomic.
pub struct DatadirModification<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
@@ -2048,6 +2049,7 @@ impl<'a> DatadirModification<'a> {
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the

View File

@@ -1,8 +1,9 @@
//! Timeline repository implementation that keeps old data in layer files, and
//! the recent changes in ephemeral files.
//!
//! Timeline repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See tenant/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//! See tenant/*_layer.rs files. The functions here are responsible for locating
//! the correct layer for the get/put call, walking back the timeline branching
//! history as needed.
//!
//! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
//! directory. See docs/pageserver-storage.md for how the files are managed.
@@ -7090,13 +7091,13 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: Key::MIN..Key::MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
// The delta layer below the horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: get_key(3)..get_key(4),
lsn_range: Lsn(0x30)..Lsn(0x48),
is_delta: true
},

View File

@@ -452,7 +452,8 @@ impl TryFrom<toml_edit::Item> for TenantConfOpt {
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}
toml_edit::Item::Table(table) => {
let deserializer = toml_edit::de::Deserializer::new(table.into());
let deserializer =
toml_edit::de::Deserializer::from(toml_edit::DocumentMut::from(table));
return serde_path_to_error::deserialize(deserializer)
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}

View File

@@ -1,7 +1,8 @@
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
//! this struct and it's original serialization format is still needed because they were written a
//! long time ago.
//! Describes the legacy now hopefully no longer modified per-timeline metadata.
//!
//! It is stored in `index_part.json` managed by [`remote_timeline_client`]. For many tenants and
//! their timelines, this struct and its original serialization format is still needed because
//! they were written a long time ago.
//!
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
//! versioning.

View File

@@ -282,9 +282,10 @@ impl BackgroundPurges {
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
@@ -2346,8 +2347,9 @@ pub enum TenantMapError {
ShuttingDown,
}
/// Guards a particular tenant_id's content in the TenantsMap. While this
/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// Guards a particular tenant_id's content in the TenantsMap.
///
/// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// for this tenant, which acts as a marker for any operations targeting
/// this tenant to retry later, or wait for the InProgress state to end.
///

View File

@@ -2184,6 +2184,8 @@ pub fn remote_timeline_path(
remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
}
/// Obtains the path of the given Layer in the remote
///
/// Note that the shard component of a remote layer path is _not_ always the same
/// as in the TenantShardId of the caller: tenants may reference layers from a different
/// ShardIndex. Use the ShardIndex from the layer's metadata.

View File

@@ -548,7 +548,7 @@ pub(crate) async fn download_initdb_tar_zst(
cancel,
)
.await
.map_err(|e| {
.inspect_err(|_e| {
// Do a best-effort attempt at deleting the temporary file upon encountering an error.
// We don't have async here nor do we want to pile on any extra errors.
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -556,7 +556,6 @@ pub(crate) async fn download_initdb_tar_zst(
warn!("error deleting temporary file {temp_path}: {e}");
}
}
e
})?;
Ok((temp_path, file))

View File

@@ -1,4 +1,5 @@
//! In-memory index to track the tenant files on the remote storage.
//!
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.

View File

@@ -434,10 +434,11 @@ impl ReadableLayer {
}
}
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
/// Layers contain a hint indicating whether they are likely to be used for reads.
///
/// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
/// when changing the visibility of layers (for example when creating a branch that makes some previously
/// covered layers visible). It should be used for cache management but not for correctness-critical checks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it

View File

@@ -136,10 +136,11 @@ impl Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
/// Struct representing reference to BLOB in layers.
///
/// Reference contains BLOB offset, and for WAL records it also contains
/// `will_init` flag. The flag helps to determine the range of records
/// that needs to be applied, without reading/deserializing records themselves.
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);

View File

@@ -1,7 +1,9 @@
//! An ImageLayer represents an image or a snapshot of a key-range at
//! one particular LSN. It contains an image of all key-value pairs
//! in its key-range. Any key that falls into the image layer's range
//! but does not exist in the layer, does not exist.
//! one particular LSN.
//!
//! It contains an image of all key-value pairs in its key-range. Any key
//! that falls into the image layer's range but does not exist in the layer,
//! does not exist.
//!
//! An image layer is stored in a file on disk. The file is stored in
//! timelines/<timeline_id> directory. Currently, there are no

View File

@@ -12,8 +12,10 @@ use serde::{Deserialize, Serialize};
#[cfg(test)]
use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// A unique identifier of a persistent layer.
///
/// This is different from `LayerDescriptor`, which is only used in the benchmarks.
/// This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {

View File

@@ -217,8 +217,9 @@ impl fmt::Display for ImageLayerName {
}
}
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time. The
/// LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time.
///
/// The LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// over time (e.g. across shard splits or compression). The physical filenames of layers in local
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])

View File

@@ -226,9 +226,11 @@ impl<'a> IteratorWrapper<'a> {
}
}
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
/// A merge iterator over delta/image layer iterators.
///
/// When duplicated records are found, the iterator will not perform any
/// deduplication, and the caller should handle these situation. By saying
/// duplicated records, there are many possibilities:
///
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.

View File

@@ -34,9 +34,10 @@ impl SplitWriterResult {
}
}
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
@@ -187,22 +188,23 @@ impl SplitImageLayerWriter {
.await
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
/// to be cleaned up).
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: DeltaLayerWriter,
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
@@ -210,7 +212,6 @@ pub struct SplitDeltaLayerWriter {
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
start_key: Key,
}
impl SplitDeltaLayerWriter {
@@ -218,29 +219,18 @@ impl SplitDeltaLayerWriter {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
inner: None,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
start_key,
})
}
@@ -263,9 +253,26 @@ impl SplitDeltaLayerWriter {
//
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
// strategy. https://github.com/neondatabase/neon/issues/8837
if self.inner.is_none() {
self.inner = Some((
key,
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
if key != self.last_key_written {
let next_delta_writer = DeltaLayerWriter::new(
@@ -277,13 +284,13 @@ impl SplitDeltaLayerWriter {
ctx,
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.start_key = key;
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
@@ -294,17 +301,18 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
self.inner.estimated_size()
inner.estimated_size()
);
}
}
self.last_key_written = key;
self.inner.put_value(key, lsn, val, ctx).await
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
}
pub async fn put_value(
@@ -323,7 +331,6 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
@@ -335,11 +342,15 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
@@ -358,15 +369,14 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
}
}
@@ -430,10 +440,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -458,11 +466,22 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 1);
assert_eq!(
layers
.into_iter()
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -499,10 +518,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -531,10 +548,7 @@ mod tests {
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
@@ -553,6 +567,14 @@ mod tests {
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
@@ -600,10 +622,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
)
.await
.unwrap();
@@ -642,11 +662,35 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 2);
let mut layers_iter = layers.into_iter();
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(1)..get_key(2),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -666,10 +710,8 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -687,10 +729,20 @@ mod tests {
.await
.unwrap();
}
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(delta_layers.len(), 1);
let delta_layer = delta_layers
.into_iter()
.next()
.unwrap()
.into_resident_layer();
assert_eq!(
delta_layer.layer_desc().key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
is_delta: true
}
);
}
}

View File

@@ -19,7 +19,6 @@ use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::config::{CompactL0BypassPageCacheValidation, CompactL0Phase1ValueAccess};
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
@@ -912,137 +911,13 @@ impl Timeline {
// we're compacting, in key, LSN order.
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
let mut all_values_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -1119,7 +994,7 @@ impl Timeline {
let mut keys = 0;
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.next()
.await
.map_err(CompactionError::Other)?
{
@@ -1934,7 +1809,6 @@ impl Timeline {
.unwrap();
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
// as an L0 layer.
let hack_end_key = Key::NON_L0_MAX;
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
let mut downloaded_layers = Vec::new();
@@ -1980,10 +1854,8 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
Key::MIN,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
ctx,
)
.await?;
@@ -2090,7 +1962,7 @@ impl Timeline {
let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
let (layers, _) = writer.take()?;
@@ -2103,7 +1975,7 @@ impl Timeline {
let produced_delta_layers = if !dry_run {
delta_layer_writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;

View File

@@ -593,8 +593,10 @@ impl<'a> VectoredBlobReader<'a> {
}
}
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
///
/// It provides a streaming API for getting read blobs. It returns a batch when
/// `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,

View File

@@ -1,6 +1,7 @@
//!
//! VirtualFile is like a normal File, but it's not bound directly to
//! a file descriptor. Instead, the file is opened when it's read from,
//! a file descriptor.
//!
//! Instead, the file is opened when it's read from,
//! and if too many files are open globally in the system, least-recently
//! used ones are closed.
//!

View File

@@ -25,9 +25,7 @@ use std::time::Duration;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::TimestampTz;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
@@ -48,16 +46,31 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
impl CheckPoint {
fn encode(&self) -> Result<Bytes, SerializeError> {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
}
fn update_next_xid(&mut self, xid: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
}
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, {
cp.update_next_multixid(multi_xid, multi_offset)
})
}
}
pub struct WalIngest {
shard: ShardIdentity,
pg_version: u32,
checkpoint: CheckPoint,
checkpoint_modified: bool,
warn_ingest_lag: WarnIngestLag,
@@ -78,12 +91,16 @@ impl WalIngest {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
let pgversion = timeline.pg_version;
let checkpoint = dispatch_pgversion!(pgversion, {
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
});
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
pg_version: timeline.pg_version,
checkpoint,
checkpoint_modified: false,
warn_ingest_lag: WarnIngestLag {
@@ -117,7 +134,7 @@ impl WalIngest {
modification.set_lsn(lsn)?;
if decoded.is_dbase_create_copy(self.pg_version) {
if decoded.is_dbase_create_copy(pg_version) {
// Records of this type should always be preceded by a commit(), as they
// rely on reading data pages back from the Timeline.
assert!(!modification.has_dirty_data_pages());
@@ -337,70 +354,67 @@ impl WalIngest {
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if cp.nextOid != next_oid {
cp.nextOid = next_oid;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
cp.oldestXid
);
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
cp.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
cp.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
cp.oldestActiveXid = oldest_active_xid;
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
});
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -424,7 +438,11 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = xlrec.oldest_running_xid;
});
self.checkpoint_modified = true;
}
}
@@ -539,7 +557,7 @@ impl WalIngest {
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
@@ -1242,12 +1260,17 @@ impl WalIngest {
fn warn_on_ingest_lag(
&mut self,
conf: &crate::config::PageServerConf,
wal_timestmap: TimestampTz,
wal_timestamp: TimestampTz,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
let now = SystemTime::now();
let rate_limits = &mut self.warn_ingest_lag;
match try_from_pg_timestamp(wal_timestmap) {
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
});
match ts {
Ok(ts) => {
match now.duration_since(ts) {
Ok(lag) => {
@@ -1257,7 +1280,7 @@ impl WalIngest {
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
})
}
},
}
Err(e) => {
let delta_t = e.duration();
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
@@ -1271,7 +1294,6 @@ impl WalIngest {
}
}
};
}
Err(error) => {
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
@@ -1379,14 +1401,17 @@ impl WalIngest {
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestXid = xlrec.oldest_xid;
cp.oldestXidDB = xlrec.oldest_xid_db;
});
self.checkpoint_modified = true;
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
let latest_page_number =
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
/ pg_constants::CLOG_XACTS_PER_PAGE;
// Now delete all segments containing pages between xlrec.pageno
// and latest_page_number.
@@ -1394,7 +1419,9 @@ impl WalIngest {
// First, make an important safety check:
// the current endpoint page must not be eligible for removal.
// See SimpleLruTruncate() in slru.c
if clogpage_precedes(latest_page_number, xlrec.pageno) {
if dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
}) {
info!("could not truncate directory pg_xact apparent wraparound");
return Ok(());
}
@@ -1411,7 +1438,12 @@ impl WalIngest {
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
});
if may_delete {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
@@ -1530,14 +1562,23 @@ impl WalIngest {
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
cp.oldestMultiDB = xlrec.oldest_multi_db;
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
pg_constants::MAX_MULTIXACT_OFFSET,
);
let startsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
(maxsegment, startsegment, endsegment)
});
self.checkpoint_modified = true;
// PerformMembersTruncation
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
let mut segment: i32 = startsegment;
// Delete all the segments except the last one. The last segment can still
@@ -1811,11 +1852,23 @@ mod tests {
// TODO
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
});
}
Ok(())
}
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_checkpoint(dispatch_pgversion!(
tline.pg_version,
pgv::ZERO_CHECKPOINT.clone()
))?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit(ctx).await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;

View File

@@ -43,13 +43,12 @@ use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
/// The real implementation that uses a Postgres process to
/// perform WAL replay.
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
/// Only one thread can use the process at a time, that is controlled by the
/// Mutex. In the future, we might want to launch a pool of processes to allow
/// concurrent replay of multiple records.
pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,

View File

@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}

View File

@@ -16,7 +16,7 @@ use tracing::debug;
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
use super::{common::Cached, *};
use super::{common::Cached, timed_lru, Cache};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:

View File

@@ -38,10 +38,7 @@ impl Api {
locks: &'static ApiLocks<EndpointCacheKey>,
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
) -> Self {
let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
Err(_) => String::new(),
};
let jwt = std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN").unwrap_or_default();
Self {
endpoint,
caches,

View File

@@ -598,15 +598,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
]
);
@@ -638,11 +638,11 @@ mod tests {
assert_eq!(
file_stats,
[
(1208861, 5, 10000),
(1208592, 5, 10000),
(1208885, 5, 10000),
(1208873, 5, 10000),
(1209128, 5, 10000)
(1203465, 5, 10000),
(1203189, 5, 10000),
(1203490, 5, 10000),
(1203475, 5, 10000),
(1203729, 5, 10000)
]
);
@@ -667,15 +667,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
]
);
@@ -712,7 +712,7 @@ mod tests {
// files are smaller than the size threshold, but they took too long to fill so were flushed early
assert_eq!(
file_stats,
[(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
[(657696, 2, 3001), (657410, 2, 3000), (657206, 2, 2999)]
);
tmpdir.close().unwrap();

View File

@@ -44,16 +44,14 @@
clippy::items_after_statements,
)]
// List of temporarily allowed lints.
// TODO: Switch to except() once stable with 1.81.
// TODO: fix code and reduce list or move to permanent list above.
#![allow(
#![expect(
clippy::cargo_common_metadata,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::doc_markdown,
clippy::implicit_hasher,
clippy::inline_always,
clippy::match_same_arms,
clippy::match_wild_err_arm,
@@ -61,21 +59,28 @@
clippy::missing_panics_doc,
clippy::module_name_repetitions,
clippy::needless_pass_by_value,
clippy::needless_raw_string_hashes,
clippy::redundant_closure_for_method_calls,
clippy::return_self_not_must_use,
clippy::similar_names,
clippy::single_match_else,
clippy::struct_excessive_bools,
clippy::struct_field_names,
clippy::too_many_lines,
clippy::unreadable_literal,
clippy::unused_async,
clippy::unused_self,
clippy::wildcard_imports
clippy::unused_self
)]
#![cfg_attr(
any(test, feature = "testing"),
allow(
clippy::needless_raw_string_hashes,
clippy::unreadable_literal,
clippy::unused_async,
)
)]
// List of temporarily allowed lints to unblock beta/nightly.
#![allow(unknown_lints, clippy::manual_inspect)]
#![allow(
unknown_lints,
// TODO: 1.82: Add `use<T>` where necessary and remove from this list.
impl_trait_overcaptures,
)]
use std::{convert::Infallible, future::Future};

View File

@@ -217,6 +217,7 @@ impl sasl::Mechanism for Exchange<'_> {
self.state = ExchangeState::SaltSent(sent);
Ok(Step::Continue(self, msg))
}
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Success(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}
@@ -224,6 +225,7 @@ impl sasl::Mechanism for Exchange<'_> {
ExchangeState::SaltSent(sent) => {
match sent.transition(self.secret, &self.tls_server_end_point, input)? {
Step::Success(keys, msg) => Ok(Step::Success(keys, msg)),
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Continue(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}

View File

@@ -745,22 +745,20 @@ impl BatchQueryData {
builder = builder.deferrable(true);
}
let transaction = builder.start().await.map_err(|e| {
let transaction = builder.start().await.inspect_err(|_| {
// if we cannot start a transaction, we should return immediately
// and not return to the pool. connection is clearly broken
discard.discard();
e
})?;
let json_output =
match query_batch(cancel.child_token(), &transaction, self, parsed_headers).await {
Ok(json_output) => {
info!("commit");
let status = transaction.commit().await.map_err(|e| {
let status = transaction.commit().await.inspect_err(|_| {
// if we cannot commit - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
json_output
@@ -776,11 +774,10 @@ impl BatchQueryData {
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
let status = transaction.rollback().await.inspect_err(|_| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
return Err(err);

View File

@@ -14,6 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
/// Stream wrapper which implements libpq's protocol.
///
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
/// to pass random malformed bytes through the connection).

View File

@@ -1,7 +1,7 @@
[toolchain]
channel = "1.80.1"
channel = "1.81.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html
# but we also need `llvm-tools-preview` for coverage data merges on CI
components = ["llvm-tools-preview", "rustfmt", "clippy"]
# but we also need `llvm-tools` for coverage data merges on CI
components = ["llvm-tools", "rustfmt", "clippy"]

View File

@@ -1,6 +1,9 @@
use utils::auth::{AuthError, Claims, Scope};
use utils::id::TenantId;
/// If tenant_id is provided, allow if token (claims) is for this tenant or
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
/// SafekeeperData.
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(

View File

@@ -19,7 +19,7 @@ use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage_broker::Uri;
use tracing::*;
@@ -261,6 +261,15 @@ async fn main() -> anyhow::Result<()> {
// Change into the data directory.
std::env::set_current_dir(&workdir)?;
// Prevent running multiple safekeepers on the same directory
let lock_file_path = workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// Set or read our ID.
let id = set_id(&workdir, args.id.map(NodeId))?;
if args.init {
@@ -364,15 +373,15 @@ async fn main() -> anyhow::Result<()> {
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// fsync the datadir to make sure we have a consistent state on disk.
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
let started = Instant::now();
utils::crashsafe::syncfs(dfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),
"syncfs data directory done"
);
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {

View File

@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
use utils::{
auth::SwappableJwtAuth,
http::{
@@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
json_response(StatusCode::OK, response)
}
/// Make term at least as high as one in request. If one in request is None,
/// increment current one.
async fn timeline_term_bump_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|r| request_span(r, timeline_backup_partial_reset),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|r| request_span(r, timeline_term_bump_handler),
)
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})

View File

@@ -484,6 +484,7 @@ pub async fn validate_temp_timeline(
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
///
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.

View File

@@ -448,8 +448,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
/// replies to reply_tx.
///
/// Reading from socket and writing to disk in parallel is beneficial for
/// performance, this struct provides the writing to disk part.
pub struct WalAcceptor {
tli: WalResidentTimeline,
msg_rx: Receiver<ProposerAcceptorMessage>,

View File

@@ -938,8 +938,9 @@ where
}
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
"processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
msg.wal_data.len(),
msg.h.begin_lsn,
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,

View File

@@ -758,9 +758,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
// pq_sendint32(&reply_message, xmin);
// pq_sendint32(&reply_message, xmin_epoch);
// So it is two big endian 32-bit words in low endian order!
hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32);
hs_feedback.catalog_xmin =
(hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32);
hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
self.ws_guard
.walsenders
.record_hs_feedback(self.ws_guard.id, &hs_feedback);

View File

@@ -1,9 +1,10 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).
use std::ops::Deref;
use std::{cmp::max, ops::Deref};
use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -12,7 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};
@@ -147,9 +148,11 @@ pub struct TimelineMemState {
pub proposer_uuid: PgUuid,
}
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
/// when we update fields like commit_lsn which don't need immediate
/// persistence. Provides transactional like API to atomically update the state.
/// Safekeeper persistent state plus in memory layer.
///
/// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
/// which don't need immediate persistence. Provides transactional like API
/// to atomically update the state.
///
/// Implements Deref into *persistent* part.
pub struct TimelineState<CTRL: control_file::Storage> {
@@ -209,6 +212,27 @@ where
let s = self.start_change();
self.finish_change(&s).await
}
/// Make term at least as `to`. If `to` is None, increment current one. This
/// is not in safekeeper.rs because we want to be able to do it even if
/// timeline is offloaded.
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let before = self.acceptor_state.term;
let mut state = self.start_change();
let new = match to {
Some(to) => max(state.acceptor_state.term, to),
None => state.acceptor_state.term + 1,
};
if new > state.acceptor_state.term {
state.acceptor_state.term = new;
self.finish_change(&state).await?;
}
let after = self.acceptor_state.term;
Ok(TimelineTermBumpResponse {
previous_term: before,
current_term: after,
})
}
}
impl<CTRL> Deref for TimelineState<CTRL>

View File

@@ -4,6 +4,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
@@ -169,6 +170,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> {
}
/// This structure is stored in shared state and represents the state of the timeline.
///
/// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
/// case, SafeKeeper is not available (because WAL is not present on disk) and all
/// operations can be done only with control file.
@@ -214,6 +216,10 @@ impl StateSK {
.get_last_log_term(self.flush_lsn())
}
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
self.state_mut().term_bump(to).await
}
/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
@@ -853,6 +859,11 @@ impl Timeline {
Ok(res)
}
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let mut state = self.write_shared_state().await;
state.sk.term_bump(to).await
}
/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is

View File

@@ -1,6 +1,8 @@
//! Code related to evicting WAL files to remote storage. The actual upload is done by the
//! partial WAL backup code. This file has code to delete and re-download WAL files,
//! cross-validate with partial WAL backup if local file is still present.
//! Code related to evicting WAL files to remote storage.
//!
//! The actual upload is done by the partial WAL backup code. This file has
//! code to delete and re-download WAL files, cross-validate with partial WAL
//! backup if local file is still present.
use anyhow::Context;
use camino::Utf8PathBuf;

View File

@@ -1,4 +1,6 @@
//! Timeline residence guard is needed to ensure that WAL segments are present on disk,
//! Timeline residence guard
//!
//! It is needed to ensure that WAL segments are present on disk,
//! as long as the code is holding the guard. This file implements guard logic, to issue
//! and drop guards, and to notify the manager when the guard is dropped.

View File

@@ -1,4 +1,5 @@
//! The timeline manager task is responsible for managing the timeline's background tasks.
//!
//! It is spawned alongside each timeline and exits when the timeline is deleted.
//! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
//! It also can manage some reactive state, like should the timeline be active for broker pushes or not.

View File

@@ -60,7 +60,8 @@ impl TimelinesSet {
}
}
/// Guard is used to add or remove timeline from the set.
/// Guard is used to add or remove timelines from the set.
///
/// If the timeline present in set, it will be removed from it on drop.
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
/// It is designed to be used in the manager task only.

View File

@@ -1,6 +1,8 @@
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
//! and `flush_lsn` updates.
//!
//! After the partial segment was updated (`flush_lsn` was changed), the segment
//! will be uploaded to S3 within the configured `partial_backup_timeout`.
//!
//! The filename format for partial segments is
//! `Segment_Term_Flush_Commit_skNN.partial`, where:

View File

@@ -17,6 +17,7 @@ use crate::SafeKeeperConf;
use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
///
/// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
/// tenant are allowed). Doesn't matter if auth is disabled in conf.

View File

@@ -98,7 +98,19 @@ pub struct PhysicalStorage {
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
/// The LSN of the last WAL record written to disk. Still can be not fully
/// flushed.
///
/// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
/// switch ingest the reverse is true because we don't bump write_lsn up to
/// the next segment: WAL stream from the compute doesn't have the gap and
/// for simplicity / as a sanity check we disallow any non-sequential
/// writes, so write zeros as is.
///
/// Similar effect is in theory possible due to LSN alignment: if record
/// ends at *2, decoder will report end lsn as *8 even though we haven't
/// written these zeros yet. In practice compute likely never sends
/// non-aligned chunks of data.
write_record_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
@@ -167,8 +179,7 @@ impl PhysicalStorage {
)
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
// note: this assumes we fsync'ed whole datadir on start.
let flush_lsn = write_lsn;
debug!(
@@ -440,11 +451,12 @@ impl Storage for PhysicalStorage {
.with_label_values(&["truncate_wal"])
.start_timer();
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
// Streaming must not create a hole, so truncate cannot be called on
// non-written lsn.
if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
"truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
self.write_record_lsn,
end_pos
);
}

View File

@@ -134,7 +134,7 @@ class LLVM:
# Show a user-friendly warning
raise Exception(' '.join([
f"It appears that you don't have `{name}` installed.",
"Please execute `rustup component add llvm-tools-preview`,",
"Please execute `rustup component add llvm-tools`,",
"or install it via your package manager of choice.",
"LLVM tools should be the same version as LLVM in `rustc --version --verbose`.",
]))
@@ -518,7 +518,7 @@ def main() -> None:
example = f"""
prerequisites:
# alternatively, install a system package for `llvm-tools`
rustup component add llvm-tools-preview
rustup component add llvm-tools
self-contained example:
{app} run make

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ALTER availability_zone_id DROP NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ALTER availability_zone_id SET NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards DROP preferred_az_id;

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards ADD preferred_az_id VARCHAR;

View File

@@ -14,14 +14,14 @@ use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::{
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
TenantCreateRequest,
ShardsPreferredAzsRequest, TenantCreateRequest,
};
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use pageserver_client::{mgmt_api, BlockUnblock};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
@@ -369,6 +369,23 @@ async fn handle_tenant_timeline_detach_ancestor(
json_response(StatusCode::OK, res)
}
async fn handle_tenant_timeline_block_unblock_gc(
service: Arc<Service>,
req: Request<Body>,
dir: BlockUnblock,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
service
.tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
@@ -539,6 +556,17 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, node_status)
}
async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node_shards(node_id).await?;
json_response(StatusCode::OK, node_status)
}
async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -688,6 +716,18 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
)
}
async fn handle_update_preferred_azs(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.update_shards_preferred_azs(azs_req).await?,
)
}
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1097,6 +1137,13 @@ pub fn make_router(
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.get("/control/v1/node/:node_id/shards", |r| {
named_request_span(
r,
handle_node_shards,
RequestName("control_v1_node_describe"),
)
})
.get("/control/v1/leader", |r| {
named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
})
@@ -1174,6 +1221,13 @@ pub fn make_router(
RequestName("control_v1_tenant_policy"),
)
})
.put("/control/v1/preferred_azs", |r| {
named_request_span(
r,
handle_update_preferred_azs,
RequestName("control_v1_preferred_azs"),
)
})
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})
@@ -1255,6 +1309,26 @@ pub fn make_router(
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
|r| {
tenant_service_handler(
r,
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
RequestName("v1_tenant_timeline_block_unblock_gc"),
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
|r| {
tenant_service_handler(
r,
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
RequestName("v1_tenant_timeline_block_unblock_gc"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

View File

@@ -36,7 +36,7 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: Option<String>,
availability_zone_id: String,
// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
@@ -63,8 +63,9 @@ impl Node {
self.id
}
pub(crate) fn get_availability_zone_id(&self) -> Option<&str> {
self.availability_zone_id.as_deref()
#[allow(unused)]
pub(crate) fn get_availability_zone_id(&self) -> &str {
self.availability_zone_id.as_str()
}
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
@@ -78,22 +79,12 @@ impl Node {
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
let az_ids_match = {
match (
self.availability_zone_id.as_deref(),
register_req.availability_zone_id.as_deref(),
) {
(Some(current_az), Some(register_req_az)) => current_az == register_req_az,
_ => true,
}
};
az_ids_match
&& self.id == register_req.node_id
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
}
/// For a shard located on this node, populate a response object
@@ -190,7 +181,7 @@ impl Node {
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: Option<String>,
availability_zone_id: String,
) -> Self {
Self {
id,

View File

@@ -7,7 +7,10 @@ use pageserver_api::{
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api::{Client, Result};
use pageserver_client::{
mgmt_api::{Client, Result},
BlockUnblock,
};
use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -258,6 +261,24 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
// measuring these makes no sense because we synchronize with the gc loop and remote
// storage on block_gc so there should be huge outliers
measured_request!(
"timeline_block_unblock_gc",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
.await
)
}
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
measured_request!(
"utilization",

Some files were not shown because too many files have changed in this diff Show More